You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 15:21:29 UTC

[pulsar] branch branch-2.9 updated (91a2b12 -> b55bba3)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 91a2b12  [Test] Cleanup ProxyPublishConsumeTest (#12607)
     new 6e884cc  Clean up the metadata of the non-persistent partitioned topics. (#12550)
     new 588dbaa  [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction (#12602)
     new 47b8e44  modify check waitingForPingResponse with volatile (#12615)
     new 4421d17  Fix String should use equals but not ==. (#12619)
     new 5e7f9cd  Remove wrong visible testing annotation in function workers (#12621)
     new 1aa80b7  [pulsar-broker] Add git branch information for PulsarVersion (#12541)
     new e46a6f7  Add @Test annotation to test methods (#12640)
     new 0ca7151  Upgrade debezium to 1.7.1 (#12644)
     new 3f809e6  [Broker] Optimize exception information for schemas (#12647)
     new af1c194  Close Zk database on unit tests (#12649)
     new b55bba3  Fix cherry-pick check style issue

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 +-
 .../mledger/impl/NonDurableCursorImpl.java         | 10 +++
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  2 +-
 pom.xml                                            |  2 +-
 .../org/apache/pulsar/broker/PulsarService.java    |  1 +
 .../broker/admin/impl/SchemasResourceBase.java     | 11 ++--
 .../AbstractDispatcherSingleActiveConsumer.java    | 10 ++-
 ...onPersistentDispatcherSingleActiveConsumer.java |  2 +-
 .../service/nonpersistent/NonPersistentTopic.java  | 19 ++++++
 .../PersistentDispatcherSingleActiveConsumer.java  |  4 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  1 +
 .../BrokerInterceptorWithClassLoaderTest.java      |  1 +
 .../ProtocolHandlerWithClassLoaderTest.java        |  1 +
 .../broker/service/NonPersistentTopicE2ETest.java  | 14 +++++
 .../broker/stats/ManagedCursorMetricsTest.java     |  1 +
 .../client/api/SimpleProducerConsumerStatTest.java |  1 +
 .../pulsar/compaction/CompactedTopicTest.java      | 73 ++++++++++++++++++++++
 .../java/org/apache/pulsar/schema/SchemaTest.java  |  1 +
 .../apache/pulsar/admin/cli/TestCmdClusters.java   |  3 +-
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  |  6 +-
 .../apache/pulsar/admin/cli/TestCmdSources.java    |  6 +-
 .../org/apache/pulsar/PulsarVersion.java           |  4 ++
 .../pulsar/common/protocol/PulsarHandler.java      |  2 +-
 .../kubernetes/KubernetesRuntimeFactoryTest.java   |  2 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |  3 +-
 .../functions/worker/FunctionMetaDataManager.java  | 19 +++---
 .../functions/worker/FunctionRuntimeManager.java   |  9 ---
 .../functions/worker/PulsarWorkerService.java      |  4 --
 .../pulsar/functions/worker/SchedulerManager.java  |  2 -
 .../pulsar/functions/worker/WorkerService.java     |  1 -
 .../functions/worker/dlog/DLInputStream.java       | 11 ----
 .../pulsar/functions/worker/rest/WorkerServer.java | 13 ----
 .../functions/worker/rest/api/ComponentImpl.java   |  5 +-
 .../functions/worker/rest/api/FunctionsImpl.java   |  2 -
 .../functions/worker/rest/api/SinksImpl.java       |  2 -
 .../functions/worker/rest/api/SourcesImpl.java     |  2 -
 .../functions/worker/rest/api/WorkerImpl.java      |  6 +-
 .../service/WorkerServiceWithClassLoader.java      |  2 -
 .../worker/FunctionMetaDataManagerTest.java        | 12 ++--
 .../elasticsearch/ElasticSearchClientSslTests.java |  2 +-
 .../io/elasticsearch/ElasticSearchSinkTests.java   |  2 +-
 .../org/apache/pulsar/metadata/TestZKServer.java   |  1 +
 .../ProxyExtensionWithClassLoaderTest.java         |  1 +
 .../pulsar/tests/integration/cli/CLITest.java      |  2 +-
 .../integration/functions/PulsarFunctionsTest.java |  4 +-
 .../tests/integration/offload/TestBaseOffload.java |  6 +-
 .../integration/topologies/PulsarTestBase.java     |  6 +-
 .../jcloud/impl/BufferedOffloadStreamTest.java     |  2 +-
 48 files changed, 189 insertions(+), 110 deletions(-)

[pulsar] 07/11: Add @Test annotation to test methods (#12640)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e46a6f7d1f005912b0c1536de9f3d8f630c1e129
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Sat Nov 6 12:07:54 2021 +0900

    Add @Test annotation to test methods (#12640)
    
    (cherry picked from commit 370f47fe17a23b56f34c1e12e7abb28dd794c3b8)
---
 .../java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java   | 1 +
 .../broker/intercept/BrokerInterceptorWithClassLoaderTest.java      | 1 +
 .../pulsar/broker/protocol/ProtocolHandlerWithClassLoaderTest.java  | 1 +
 .../org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java    | 1 +
 .../apache/pulsar/client/api/SimpleProducerConsumerStatTest.java    | 1 +
 .../src/test/java/org/apache/pulsar/schema/SchemaTest.java          | 1 +
 .../src/test/java/org/apache/pulsar/admin/cli/TestCmdClusters.java  | 3 ++-
 .../src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java     | 6 +++---
 .../src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java   | 6 +++---
 .../functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java  | 2 +-
 .../pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java  | 3 ++-
 .../apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java | 2 +-
 .../org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java  | 2 +-
 .../pulsar/proxy/extensions/ProxyExtensionWithClassLoaderTest.java  | 1 +
 .../pulsar/tests/integration/functions/PulsarFunctionsTest.java     | 4 ++--
 .../apache/pulsar/tests/integration/offload/TestBaseOffload.java    | 6 +++---
 .../apache/pulsar/tests/integration/topologies/PulsarTestBase.java  | 6 +++---
 .../mledger/offload/jcloud/impl/BufferedOffloadStreamTest.java      | 2 +-
 18 files changed, 29 insertions(+), 20 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 4a27dbb..a471cab 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -915,6 +915,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
     }
 
+    @Test
     public void testGetMessageById() throws Exception {
         TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
         admin.tenants().createTenant("tenant-xyz", tenantInfo);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
index aa4a5bc..ae66937 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoaderTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.verify;
 @Test(groups = "broker")
 public class BrokerInterceptorWithClassLoaderTest {
 
+    @Test
     public void testWrapper() throws Exception {
         BrokerInterceptor h = mock(BrokerInterceptor.class);
         NarClassLoader loader = mock(NarClassLoader.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerWithClassLoaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerWithClassLoaderTest.java
index 42d26d5..75d8c64 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerWithClassLoaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/ProtocolHandlerWithClassLoaderTest.java
@@ -74,6 +74,7 @@ public class ProtocolHandlerWithClassLoaderTest {
         verify(h, times(1)).getProtocolDataToAdvertise();
     }
 
+    @Test
     public void testClassLoaderSwitcher() throws Exception {
         NarClassLoader loader = mock(NarClassLoader.class);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 9b3b354..5e20c09 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -49,6 +49,7 @@ public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
         super.internalCleanup();
     }
 
+    @Test
     public void testManagedCursorMetrics() throws Exception {
         final String subName = "my-sub";
         final String topicName = "persistent://my-namespace/use/my-ns/my-topic1";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index cc83ba2..480d835 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -335,6 +335,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
     public void testBatchMessagesRateOut() throws PulsarClientException, InterruptedException, PulsarAdminException {
         log.info("-- Starting {} test --", methodName);
         String topicName = "persistent://my-property/cluster/my-ns/testBatchMessagesRateOut";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index dab9d08..db56fbc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -800,6 +800,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         assertEquals("foo", message.getValue());
     }
 
+    @Test
     public void testConsumeMultipleSchemaMessages() throws Exception {
         final String namespace = "test-namespace-" + randomName(16);
         String ns = PUBLIC_TENANT + "/" + namespace;
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdClusters.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdClusters.java
index 0ab093d..cd4c22f 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdClusters.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdClusters.java
@@ -62,7 +62,8 @@ public class TestCmdClusters {
         testCmdClusterConfigFile(clusterData, clusterData);
     }
 
-    public void testCmdClusterConfigFile(ClusterData testClusterData, ClusterData expectedClusterData) throws Exception {
+    private void testCmdClusterConfigFile(ClusterData testClusterData, ClusterData expectedClusterData)
+            throws Exception {
         File file = Files.createTempFile("tmp_cluster", ".yaml").toFile();
         ObjectMapperFactory.getThreadLocalYaml().writeValue(file, testClusterData);
         Assert.assertEquals(testClusterData, CmdUtils.loadConfig(file.getAbsolutePath(), ClusterData.class));
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 87f9a3f..27402bd 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -341,7 +341,7 @@ public class TestCmdSinks {
         );
     }
 
-    public void testCmdSinkCliMissingArgs(
+    private void testCmdSinkCliMissingArgs(
             String tenant,
             String namespace,
             String name,
@@ -492,7 +492,7 @@ public class TestCmdSinks {
         testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
     }
 
-    public void testCmdSinkConfigFile(SinkConfig testSinkConfig, SinkConfig expectedSinkConfig) throws Exception {
+    private void testCmdSinkConfigFile(SinkConfig testSinkConfig, SinkConfig expectedSinkConfig) throws Exception {
 
         File file = Files.createTempFile("", "").toFile();
 
@@ -572,7 +572,7 @@ public class TestCmdSinks {
         );
     }
 
-    public void testMixCliAndConfigFile(
+    private void testMixCliAndConfigFile(
             String tenant,
             String namespace,
             String name,
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 5b4d906..ff75d26 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -261,7 +261,7 @@ public class TestCmdSources {
         );
     }
 
-    public void testCmdSourceCliMissingArgs(
+    private void testCmdSourceCliMissingArgs(
             String tenant,
             String namespace,
             String name,
@@ -500,7 +500,7 @@ public class TestCmdSources {
         );
     }
 
-    public void testMixCliAndConfigFile(
+    private void testMixCliAndConfigFile(
             String tenant,
             String namespace,
             String name,
@@ -674,4 +674,4 @@ public class TestCmdSources {
         Assert.assertEquals(config.get("float_string"), "1000.0");
         Assert.assertEquals(config.get("created_at"), "Mon Jul 02 00:33:15 +0000 2018");
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index 1b8946b..dc0119c 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -333,7 +333,7 @@ public class KubernetesRuntimeFactoryTest {
                 "Per instance ram requested, 0, for function should be positive and a multiple of the granularity, 1000");
     }
 
-    public void testAuthProvider(Optional<FunctionAuthProvider> authProvider) throws Exception {
+    private void testAuthProvider(Optional<FunctionAuthProvider> authProvider) throws Exception {
         factory = createKubernetesRuntimeFactory(null, null, null, null, false,
                 authProvider, Optional.empty());
     }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 928497a..7f6c36a 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -358,7 +358,8 @@ public class KubernetesRuntimeTest {
     	testResources(1.0 / 1.5, 1000, 1.3, 1.0);
     }
 
-    public void testResources(double userCpuRequest, long userMemoryRequest, double cpuOverCommitRatio, double memoryOverCommitRatio) throws Exception {
+    private void testResources(double userCpuRequest, long userMemoryRequest, double cpuOverCommitRatio,
+            double memoryOverCommitRatio) throws Exception {
 
         Function.Resources resources = Function.Resources.newBuilder()
                 .setRam(userMemoryRequest).setCpu(userCpuRequest).setDisk(10000L).build();
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
index 9f878aa..7cbd249 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
@@ -157,7 +157,7 @@ public class ElasticSearchClientSslTests {
     }
 
 
-    public void testIndexExists(ElasticSearchClient client) throws IOException {
+    private void testIndexExists(ElasticSearchClient client) throws IOException {
         assertFalse(client.indexExists("mynewindex"));
         assertTrue(client.createIndexIfNeeded("mynewindex"));
         assertTrue(client.indexExists("mynewindex"));
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 9e3363c..c752354 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -292,7 +292,7 @@ public class ElasticSearchSinkTests {
         testNullValue(ElasticSearchConfig.NullValueAction.DELETE);
     }
 
-    public void testNullValue(ElasticSearchConfig.NullValueAction action) throws Exception {
+    private void testNullValue(ElasticSearchConfig.NullValueAction action) throws Exception {
         String index = "testnullvalue" + action.toString().toLowerCase(Locale.ROOT);
         map.put("indexName", index);
         map.put("keyIgnore", "false");
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoaderTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoaderTest.java
index b43eb22..57882a6 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoaderTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoaderTest.java
@@ -70,6 +70,7 @@ public class ProxyExtensionWithClassLoaderTest {
         verify(h, times(1)).start(service);
     }
 
+    @Test
     public void testClassLoaderSwitcher() throws Exception {
         NarClassLoader loader = mock(NarClassLoader.class);
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 87357a04..ad97740 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -120,7 +120,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         return kvs;
     }
 
-    public void testFunctionLocalRun(Runtime runtime) throws  Exception {
+    protected void testFunctionLocalRun(Runtime runtime) throws  Exception {
         if (functionRuntimeType == FunctionRuntimeType.THREAD) {
             return;
         }
@@ -243,7 +243,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
     }
 
-    public void testWindowFunction(String type, String[] expectedResults) throws Exception {
+    protected void testWindowFunction(String type, String[] expectedResults) throws Exception {
         int NUM_OF_MESSAGES = 100;
         int windowLengthCount = 10;
         int slidingIntervalCount = 5;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
index ad7a8fd..f5a320e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
@@ -48,7 +48,7 @@ public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite {
         return entry;
     }
 
-    public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception {
+    protected void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception {
         final String tenant = "offload-test-cli-" + randomName(4);
         final String namespace = tenant + "/ns1";
         final String topic = "persistent://" + namespace + "/topic1";
@@ -120,7 +120,7 @@ public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite {
         }
     }
 
-    public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception {
+    protected void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception {
         final String tenant = "offload-test-threshold-" + randomName(4);
         final String namespace = tenant + "/ns1";
         final String topic = "persistent://" + namespace + "/topic1";
@@ -240,7 +240,7 @@ public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite {
         }
     }
 
-    public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception {
+    protected void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception {
         final String tenant = "offload-test-deletion-lag-" + randomName(4);
         final String namespace = tenant + "/ns1";
         final String topic = "persistent://" + namespace + "/topic1";
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
index ebdfbe8..778c67b 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
@@ -76,7 +76,7 @@ public abstract class PulsarTestBase extends TestRetrySupport {
         }
     }
 
-    public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
+    protected void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
         String topicName = generateTopicName("testpubconsume", isPersistent);
 
         int numMessages = 10;
@@ -107,7 +107,7 @@ public abstract class PulsarTestBase extends TestRetrySupport {
         }
     }
 
-    public void testBatchMessagePublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
+    protected void testBatchMessagePublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception {
         String topicName = generateTopicName("test-batch-publish-consume", isPersistent);
 
         final int numMessages = 10000;
@@ -142,7 +142,7 @@ public abstract class PulsarTestBase extends TestRetrySupport {
         }
     }
 
-    public void testBatchIndexAckDisabled(String serviceUrl) throws Exception {
+    protected void testBatchIndexAckDisabled(String serviceUrl) throws Exception {
         String topicName = generateTopicName("test-batch-index-ack-disabled", true);
         final int numMessages = 100;
         try (PulsarClient client = PulsarClient.builder()
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStreamTest.java
index 4ebf943..a5dc7ba 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStreamTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStreamTest.java
@@ -42,7 +42,7 @@ import org.testng.Assert;
 public class BufferedOffloadStreamTest {
     final Random random = new Random();
 
-    public void testWithPadding(int paddingLen) throws Exception {
+    private void testWithPadding(int paddingLen) throws Exception {
         int blockSize = StreamingDataBlockHeaderImpl.getDataStartOffset();
         List<Entry> entryBuffer = new LinkedList<>();
         final UUID uuid = UUID.randomUUID();

[pulsar] 09/11: [Broker] Optimize exception information for schemas (#12647)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3f809e65fd5f85b6b10c1169177d053759cfaf3f
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Sat Nov 6 22:14:19 2021 +0800

    [Broker] Optimize exception information for schemas (#12647)
    
    (cherry picked from commit 36f151ce442ec928c6fcf5840b825284f7f5ae88)
---
 .../apache/pulsar/broker/admin/impl/SchemasResourceBase.java  | 11 +++++++----
 .../java/org/apache/pulsar/tests/integration/cli/CLITest.java |  2 +-
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 87fc3ae..dfd870a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -272,9 +272,11 @@ public class SchemasResourceBase extends AdminResource {
     private static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMetadata schema, Throwable error) {
         if (isNull(error)) {
             if (isNull(schema)) {
-                response.resume(Response.status(Response.Status.NOT_FOUND).build());
+                response.resume(Response.status(
+                        Response.Status.NOT_FOUND.getStatusCode(), "Schema not found").build());
             } else if (schema.schema.isDeleted()) {
-                response.resume(Response.status(Response.Status.NOT_FOUND).build());
+                response.resume(Response.status(
+                        Response.Status.NOT_FOUND.getStatusCode(), "Schema is deleted").build());
             } else {
                 response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
                         .entity(convertSchemaAndMetadataToGetSchemaResponse(schema)).build());
@@ -290,7 +292,8 @@ public class SchemasResourceBase extends AdminResource {
             Throwable error) {
         if (isNull(error)) {
             if (isNull(schemas)) {
-                response.resume(Response.status(Response.Status.NOT_FOUND).build());
+                response.resume(Response.status(
+                        Response.Status.NOT_FOUND.getStatusCode(), "Schemas not found").build());
             } else {
                 response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
                         .entity(GetAllVersionsSchemaResponse.builder()
@@ -312,7 +315,7 @@ public class SchemasResourceBase extends AdminResource {
             validateTopicOwnership(topicName, authoritative);
         } catch (RestException e) {
             if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
-                throw new RestException(Response.Status.NOT_FOUND, "Not Found");
+                throw new RestException(Response.Status.UNAUTHORIZED, e.getMessage());
             } else {
                 throw e;
             }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index bd712ca..f4dc1c8 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -334,7 +334,7 @@ public class CLITest extends PulsarTestSuite {
                               );
             fail("Command should have exited with non-zero");
         } catch (ContainerExecException e) {
-            assertTrue(e.getResult().getStderr().contains("Reason: HTTP 404 Not Found"));
+            assertTrue(e.getResult().getStderr().contains("Schema not found"));
         }
     }
 

[pulsar] 03/11: modify check waitingForPingResponse with volatile (#12615)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 47b8e44660c2f15934381c8d5d13edd0d9d9090a
Author: baomingyu <ba...@163.com>
AuthorDate: Fri Nov 5 22:05:04 2021 +0800

    modify check waitingForPingResponse with volatile (#12615)
    
    (cherry picked from commit 62e2547bea445c4f67935a57f59886757facbd2d)
---
 .../src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 557dbfa..cdf372d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -37,7 +37,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
     protected SocketAddress remoteAddress;
     private int remoteEndpointProtocolVersion = ProtocolVersion.v0.getValue();
     private final long keepAliveIntervalSeconds;
-    private boolean waitingForPingResponse = false;
+    private volatile boolean waitingForPingResponse = false;
     private ScheduledFuture<?> keepAliveTask;
 
     public int getRemoteEndpointProtocolVersion() {

[pulsar] 08/11: Upgrade debezium to 1.7.1 (#12644)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0ca7151e6a4e7d521bd2eb0986605e00e9ba97a1
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Nov 5 19:59:51 2021 -0700

    Upgrade debezium to 1.7.1 (#12644)
    
    (cherry picked from commit ccfe395ed86886f56269a10e43789d9829c0c00c)
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 4dca3ba..f142eb9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,7 +155,7 @@ flexible messaging model and an intuitive client API.</description>
     <presto.version>332</presto.version>
     <scala.binary.version>2.13</scala.binary.version>
     <scala-library.version>2.13.6</scala-library.version>
-    <debezium.version>1.7.0.Final</debezium.version>
+    <debezium.version>1.7.1.Final</debezium.version>
     <jsonwebtoken.version>0.11.1</jsonwebtoken.version>
     <opencensus.version>0.18.0</opencensus.version>
     <hbase.version>2.3.0</hbase.version>

[pulsar] 01/11: Clean up the metadata of the non-persistent partitioned topics. (#12550)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6e884ccdb9a0e478dae2757c7cd8ff30e80f38b3
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Nov 5 22:11:16 2021 +0800

    Clean up the metadata of the non-persistent partitioned topics. (#12550)
    
    (cherry picked from commit 6ecef2ec9c02e3a79fce88ac1a3cf44dd4f82509)
---
 .../service/nonpersistent/NonPersistentTopic.java     | 19 +++++++++++++++++++
 .../broker/service/NonPersistentTopicE2ETest.java     | 14 ++++++++++++++
 2 files changed, 33 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 916aa53..4b1c572 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.AbstractReplicator;
+import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -881,6 +882,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
                     }
 
                     stopReplProducers().thenCompose(v -> delete(true, false, true))
+                            .thenAccept(__ -> tryToDeletePartitionedMetadata())
                             .thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
                             .exceptionally(e -> {
                                 Throwable throwable = e.getCause();
@@ -902,6 +904,23 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         }
     }
 
+    private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
+        if (TopicName.get(topic).isPartitioned() && !deletePartitionedTopicMetadataWhileInactive()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+        try {
+            NamespaceResources.PartitionedTopicResources partitionedTopicResources = brokerService.pulsar()
+                    .getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
+            if (!partitionedTopicResources.partitionedTopicExists(topicName)) {
+                return CompletableFuture.completedFuture(null);
+            }
+            return partitionedTopicResources.deletePartitionedTopicAsync(topicName);
+        } catch (Exception e) {
+            return FutureUtil.failedFuture(e);
+        }
+    }
+
     @Override
     public void checkInactiveSubscriptions() {
         TopicName name = TopicName.get(topic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index 1543d3e..9525944 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -52,6 +52,8 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
+        conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
         super.baseSetup();
     }
 
@@ -228,5 +230,17 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
         producer2.close();
 
         assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 6. Test for partitioned topic to delete the partitioned metadata
+        String topicGc = "non-persistent://prop/ns-abc/topic-gc";
+        int partitions = 5;
+        admin.topics().createPartitionedTopic(topicGc, partitions);
+        Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicGc).create();
+        producer3.close();
+        assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                TopicName.get(topicGc)).join().partitions == partitions);
+        runGC();
+        assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                TopicName.get(topicGc)).join().partitions == 0);
     }
 }

[pulsar] 11/11: Fix cherry-pick check style issue

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b55bba3c75089b2abb3c5cc8450ceec75af95eab
Author: penghui <pe...@apache.org>
AuthorDate: Mon Dec 20 23:18:36 2021 +0800

    Fix cherry-pick check style issue
---
 .../apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 4b1c572..43245b9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -38,8 +38,8 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;

[pulsar] 05/11: Remove wrong visible testing annotation in function workers (#12621)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5e7f9cd60f57ca4591944de3a089bf6cc746ac8c
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Fri Nov 5 15:39:37 2021 +0800

    Remove wrong visible testing annotation in function workers (#12621)
    
    * Remove wrong visible testing annotation
    
    * Remove unused method since 2017
    
    (cherry picked from commit 8496afc58bdd27c47cde8a9ba3c76b80ab796320)
---
 .../functions/worker/FunctionMetaDataManager.java     | 19 +++++++------------
 .../functions/worker/FunctionRuntimeManager.java      |  9 ---------
 .../pulsar/functions/worker/PulsarWorkerService.java  |  4 ----
 .../pulsar/functions/worker/SchedulerManager.java     |  2 --
 .../apache/pulsar/functions/worker/WorkerService.java |  1 -
 .../pulsar/functions/worker/dlog/DLInputStream.java   | 11 -----------
 .../pulsar/functions/worker/rest/WorkerServer.java    | 13 -------------
 .../functions/worker/rest/api/ComponentImpl.java      |  5 +----
 .../functions/worker/rest/api/FunctionsImpl.java      |  2 --
 .../pulsar/functions/worker/rest/api/SinksImpl.java   |  2 --
 .../pulsar/functions/worker/rest/api/SourcesImpl.java |  2 --
 .../pulsar/functions/worker/rest/api/WorkerImpl.java  |  6 +-----
 .../worker/service/WorkerServiceWithClassLoader.java  |  2 --
 .../functions/worker/FunctionMetaDataManagerTest.java | 12 ++++++------
 14 files changed, 15 insertions(+), 75 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 1ff1b2b..5388bd7 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -62,7 +61,6 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
 public class FunctionMetaDataManager implements AutoCloseable {
     // Represents the global state
     // tenant -> namespace -> (function name, FunctionRuntimeInfo)
-    @VisibleForTesting
     final Map<String, Map<String, Map<String, FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap<>();
 
     private final SchedulerManager schedulerManager;
@@ -240,7 +238,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
             }
             lastMessageSeen = builder.send();
             if (delete) {
-                needsScheduling = proccessDeregister(functionMetaData);
+                needsScheduling = processDeregister(functionMetaData);
             } else {
                 needsScheduling = processUpdate(functionMetaData);
             }
@@ -359,7 +357,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
                 this.processUpdate(serviceRequest.getFunctionMetaData());
                 break;
             case DELETE:
-                this.proccessDeregister(serviceRequest.getFunctionMetaData());
+                this.processDeregister(serviceRequest.getFunctionMetaData());
                 break;
             default:
                 log.warn("Received request with unrecognized type: {}", serviceRequest);
@@ -373,7 +371,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
         String functionName = FunctionCommon.extractNameFromFullyQualifiedName(message.getKey());
         if (message.getData() == null || message.getData().length == 0) {
             // this is a delete message
-            this.proccessDeregister(tenant, namespace, functionName, version);
+            this.processDeregister(tenant, namespace, functionName, version);
         } else {
             FunctionMetaData functionMetaData = FunctionMetaData.parseFrom(message.getData());
             this.processUpdate(functionMetaData);
@@ -404,16 +402,15 @@ public class FunctionMetaDataManager implements AutoCloseable {
         return false;
     }
 
-    @VisibleForTesting
-    synchronized boolean proccessDeregister(FunctionMetaData deregisterRequestFs) throws IllegalArgumentException {
+    synchronized boolean processDeregister(FunctionMetaData deregisterRequestFs) throws IllegalArgumentException {
         String functionName = deregisterRequestFs.getFunctionDetails().getName();
         String tenant = deregisterRequestFs.getFunctionDetails().getTenant();
         String namespace = deregisterRequestFs.getFunctionDetails().getNamespace();
-        return proccessDeregister(tenant, namespace, functionName, deregisterRequestFs.getVersion());
+        return processDeregister(tenant, namespace, functionName, deregisterRequestFs.getVersion());
     }
 
-    synchronized boolean proccessDeregister(String tenant, String namespace,
-                                            String functionName, long version) throws IllegalArgumentException {
+    synchronized boolean processDeregister(String tenant, String namespace,
+                                           String functionName, long version) throws IllegalArgumentException {
 
         boolean needsScheduling = false;
 
@@ -437,7 +434,6 @@ public class FunctionMetaDataManager implements AutoCloseable {
         return needsScheduling;
     }
 
-    @VisibleForTesting
     synchronized boolean processUpdate(FunctionMetaData updateRequestFs) throws IllegalArgumentException {
 
         log.debug("Process update request: {}", updateRequestFs);
@@ -481,7 +477,6 @@ public class FunctionMetaDataManager implements AutoCloseable {
         return currentFunctionMetaData.getVersion() >= version;
     }
 
-    @VisibleForTesting
     void setFunctionMetaData(FunctionMetaData functionMetaData) {
         Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
         if (!this.functionMetaDataMap.containsKey(functionDetails.getTenant())) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index eaad132..d37049f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -79,12 +79,10 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
     // all assignments
     // WorkerId -> Function Fully Qualified InstanceId -> List<Assignments>
-    @VisibleForTesting
     Map<String, Map<String, Assignment>> workerIdToAssignments = new ConcurrentHashMap<>();
 
     // All the runtime info related to functions executed by this worker
     // Fully Qualified InstanceId - > FunctionRuntimeInfo
-    @VisibleForTesting
     class FunctionRuntimeInfos {
 
         private Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<>();
@@ -114,10 +112,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
         }
     }
 
-    @VisibleForTesting
     final FunctionRuntimeInfos functionRuntimeInfos = new FunctionRuntimeInfos();
 
-    @VisibleForTesting
     @Getter
     final WorkerConfig workerConfig;
 
@@ -267,10 +263,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
     }
 
     /**
-     * Starts the function runtime manager
-     */
-
-    /**
      * Get current assignments
      * @return a map of current assignments in the following format
      * {workerId : {FullyQualifiedInstanceId : Assignment}}
@@ -827,7 +819,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
         }
     }
 
-    @VisibleForTesting
     void deleteAssignment(Assignment assignment) {
         String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
         Map<String, Assignment> assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId());
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 9c47dfe..3056a0d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -45,7 +45,6 @@ import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -55,12 +54,10 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.policies.path.PolicyPath;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
@@ -188,7 +185,6 @@ public class PulsarWorkerService implements WorkerService {
         );
     }
 
-    @VisibleForTesting
     public void init(WorkerConfig workerConfig,
                      URI dlogUri,
                      boolean runAsStandalone) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 130e839..2fed499 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -371,7 +371,6 @@ public class SchedulerManager implements AutoCloseable {
         return currentMembership;
     }
 
-    @VisibleForTesting
     void invokeScheduler() {
         long startTime = System.nanoTime();
 
@@ -561,7 +560,6 @@ public class SchedulerManager implements AutoCloseable {
         assignmentsMovedInLastDrain = null;
     }
 
-    @VisibleForTesting
     List<Assignment> invokeDrain(String workerId) {
 
         long startTime = System.nanoTime();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index f76f753..f5293ce 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.worker;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/dlog/DLInputStream.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/dlog/DLInputStream.java
index 10acb54..27af304 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/dlog/DLInputStream.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/dlog/DLInputStream.java
@@ -40,10 +40,8 @@ public class DLInputStream extends InputStream {
   // Cache the input stream for a log record.
   private static class LogRecordWithInputStream {
     private final InputStream payloadStream;
-    private final LogRecordWithDLSN logRecord;
 
     LogRecordWithInputStream(LogRecordWithDLSN logRecord) {
-      this.logRecord = logRecord;
       this.payloadStream = logRecord.getPayLoadInputStream();
     }
 
@@ -51,15 +49,6 @@ public class DLInputStream extends InputStream {
       return payloadStream;
     }
 
-    LogRecordWithDLSN getLogRecord() {
-      return logRecord;
-    }
-
-    // The last txid of the log record is the position of the next byte in the stream.
-    // Subtract length to get starting offset.
-    long getOffset() {
-      return logRecord.getTransactionId() - logRecord.getPayload().length;
-    }
   }
 
   /**
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index c7414c2..e531084 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
-import com.google.common.annotations.VisibleForTesting;
 import io.prometheus.client.jetty.JettyStatisticsCollector;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -46,8 +45,6 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 
-import java.net.BindException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -69,15 +66,6 @@ public class WorkerServer {
     private ServerConnector httpConnector;
     private ServerConnector httpsConnector;
 
-    private static String getErrorMessage(Server server, int port, Exception ex) {
-        if (ex instanceof BindException) {
-            final URI uri = server.getURI();
-            return String.format("%s http://%s:%d", ex.getMessage(), uri.getHost(), port);
-        }
-
-        return ex.getMessage();
-    }
-
     public WorkerServer(WorkerService workerService, AuthenticationService authenticationService) {
         this.workerConfig = workerService.getWorkerConfig();
         this.workerService = workerService;
@@ -191,7 +179,6 @@ public class WorkerServer {
         return contextHandler;
     }
 
-    @VisibleForTesting
     public void stop() {
         if (this.server != null) {
             try {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index c1bc996..e8d67a5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -267,10 +267,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
         if (workerService == null) {
             return false;
         }
-        if (!workerService.isInitialized()) {
-            return false;
-        }
-        return true;
+        return workerService.isInitialized();
     }
 
     PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaData functionMetaData,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 8c35851..b00846d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -32,7 +32,6 @@ import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -64,7 +63,6 @@ import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.PulsarWorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.service.api.Functions;
-import org.apache.pulsar.packages.management.core.common.PackageType;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 @Slf4j
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index bbb1680..ab69ab9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -64,7 +63,6 @@ import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.PulsarWorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.service.api.Sinks;
-import org.apache.pulsar.packages.management.core.common.PackageType;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 @Slf4j
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 82a818d..df2dca8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -64,7 +63,6 @@ import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.PulsarWorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.service.api.Sources;
-import org.apache.pulsar.packages.management.core.common.PackageType;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
 @Slf4j
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index f32f6d1..6c2180c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.functions.worker.rest.api;
 
 import lombok.extern.slf4j.Slf4j;
-import lombok.val;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -76,10 +75,7 @@ public class WorkerImpl implements Workers<PulsarWorkerService> {
         if (workerService == null) {
             return false;
         }
-        if (!workerService.isInitialized()) {
-            return false;
-        }
-        return true;
+        return workerService.isInitialized();
     }
 
     @Override
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/WorkerServiceWithClassLoader.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/WorkerServiceWithClassLoader.java
index 9439d72..6959616 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/WorkerServiceWithClassLoader.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/WorkerServiceWithClassLoader.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.nar.NarClassLoader;
@@ -38,7 +37,6 @@ import org.apache.pulsar.functions.worker.service.api.FunctionsV2;
 import org.apache.pulsar.functions.worker.service.api.Sinks;
 import org.apache.pulsar.functions.worker.service.api.Sources;
 import org.apache.pulsar.functions.worker.service.api.Workers;
-import org.apache.pulsar.zookeeper.ZooKeeperCache;
 
 /**
  * A worker service with its classloader.
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index e5221bd..f7d133d 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -299,7 +299,7 @@ public class FunctionMetaDataManagerTest {
                         mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
 
         doReturn(true).when(functionMetaDataManager).processUpdate(any(Function.FunctionMetaData.class));
-        doReturn(true).when(functionMetaDataManager).proccessDeregister(any(Function.FunctionMetaData.class));
+        doReturn(true).when(functionMetaDataManager).processDeregister(any(Function.FunctionMetaData.class));
 
         Request.ServiceRequest serviceRequest
                 = Request.ServiceRequest.newBuilder().setServiceRequestType(
@@ -324,9 +324,9 @@ public class FunctionMetaDataManagerTest {
         doReturn(serviceRequest.toByteArray()).when(msg).getData();
         functionMetaDataManager.processMetaDataTopicMessage(msg);
 
-        verify(functionMetaDataManager, times(1)).proccessDeregister(
+        verify(functionMetaDataManager, times(1)).processDeregister(
                 any(Function.FunctionMetaData.class));
-        verify(functionMetaDataManager).proccessDeregister(serviceRequest.getFunctionMetaData());
+        verify(functionMetaDataManager).processDeregister(serviceRequest.getFunctionMetaData());
     }
 
     @Test
@@ -393,7 +393,7 @@ public class FunctionMetaDataManagerTest {
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
                         .setNamespace("namespace-1").setTenant("tenant-1")).build();
 
-        Assert.assertFalse(functionMetaDataManager.proccessDeregister(m1));
+        Assert.assertFalse(functionMetaDataManager.processDeregister(m1));
         verify(functionMetaDataManager, times(0))
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
         verify(schedulerManager, times(0)).schedule();
@@ -411,7 +411,7 @@ public class FunctionMetaDataManagerTest {
 
         // outdated delete request
         try {
-            functionMetaDataManager.proccessDeregister(m1);
+            functionMetaDataManager.processDeregister(m1);
             Assert.assertTrue(false);
         } catch (IllegalArgumentException e) {
             Assert.assertEquals(e.getMessage(), "Delete request ignored because it is out of date. Please try again.");
@@ -426,7 +426,7 @@ public class FunctionMetaDataManagerTest {
 
         // delete now
         m1 = m1.toBuilder().setVersion(2).build();
-        Assert.assertTrue(functionMetaDataManager.proccessDeregister(m1));
+        Assert.assertTrue(functionMetaDataManager.processDeregister(m1));
         verify(functionMetaDataManager, times(1))
                 .setFunctionMetaData(any(Function.FunctionMetaData.class));
         verify(schedulerManager, times(0)).schedule();

[pulsar] 04/11: Fix String should use equals but not ==. (#12619)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4421d1704fa1a8472110f6680c4614f1815a2c07
Author: liuchangqing <ca...@126.com>
AuthorDate: Fri Nov 5 14:31:48 2021 +0800

    Fix String should use equals but not ==. (#12619)
    
    Fix String should use equals but not ==
    
    (cherry picked from commit c2165e898f4e5cda7e89f5a7106263e7a1ec48ac)
---
 .../test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 006c287..105405f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -228,7 +228,7 @@ public abstract class BookKeeperClusterTestCase {
         conf.setBookiePort(port);
         if (ledgerRootPath != "") {
             conf.setMetadataServiceUri("zk://" + zkUtil.getZooKeeperConnectString() + ledgerRootPath);
-        }else {
+        } else {
             conf.setZkServers(zkServers);
         }
         conf.setJournalDirName(journalDir.getPath());

[pulsar] 02/11: [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction (#12602)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 588dbaa3ac48f757f1e7fc6f1a4b3aef162b2309
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Nov 5 00:55:59 2021 +0800

    [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction (#12602)
    
    * [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction.
    
    For the non-durable cursor, the ledgers trimming task will cause skip the removed ledgers
    to avoid readers introduced backlogs and make sure the data can be removed if over the retention,
    more details to see #6787.
    
    But for a topic which enabled compaction, this will lead to the reader skips the compacted data.
    The new added test can illustrate this problem well. For reading compacted data, reading a message ID
    that earlier that the first message ID of the original data is a normal behavior, so we should not
    move forward the cursor which will read the compacted data.
    
    * Fix checkstyle.
    
    * Fix tests.
    
    * Fix tests.
    
    (cherry picked from commit a6b1b34a5c028b74bd44c5b8f32b42752b6cec14)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 +-
 .../mledger/impl/NonDurableCursorImpl.java         | 10 +++
 .../AbstractDispatcherSingleActiveConsumer.java    | 10 ++-
 ...onPersistentDispatcherSingleActiveConsumer.java |  2 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  4 +-
 .../pulsar/compaction/CompactedTopicTest.java      | 73 ++++++++++++++++++++++
 6 files changed, 95 insertions(+), 7 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ce897f3..a59ce22 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2548,7 +2548,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
             // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed
             if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
-                    && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) {
+                    && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
+                    && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
                 cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
                     @Override
                     public void markDeleteComplete(Object ctx) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 15b1f04..0f7ffe41 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 
 public class NonDurableCursorImpl extends ManagedCursorImpl {
 
+    private volatile boolean readCompacted;
+
     NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
                          PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition) {
         super(bookkeeper, config, ledger, cursorName);
@@ -116,6 +118,14 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
         callback.deleteCursorComplete(ctx);
     }
 
+    public void setReadCompacted(boolean readCompacted) {
+        this.readCompacted = readCompacted;
+    }
+
+    public boolean isReadCompacted() {
+        return readCompacted;
+    }
+
     @Override
     public synchronized String toString() {
         return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 690a598..4c7ea45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -26,6 +26,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -45,7 +47,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
     protected boolean isKeyHashRangeFiltered = false;
     protected CompletableFuture<Void> closeFuture = null;
     protected final int partitionIndex;
-
+    protected final ManagedCursor cursor;
     // This dispatcher supports both the Exclusive and Failover subscription types
     protected final SubType subscriptionType;
 
@@ -59,12 +61,13 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
 
     public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
                                                   String topicName, Subscription subscription,
-                                                  ServiceConfiguration serviceConfig) {
+                                                  ServiceConfiguration serviceConfig, ManagedCursor cursor) {
         super(subscription, serviceConfig);
         this.topicName = topicName;
         this.consumers = new CopyOnWriteArrayList<>();
         this.partitionIndex = partitionIndex;
         this.subscriptionType = subscriptionType;
+        this.cursor = cursor;
         ACTIVE_CONSUMER_UPDATER.set(this, null);
     }
 
@@ -178,6 +181,9 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
                 consumer.notifyActiveConsumerChange(currentActiveConsumer);
             }
         }
+        if (cursor != null && !cursor.isDurable() && cursor instanceof NonDurableCursorImpl) {
+            ((NonDurableCursorImpl) cursor).setReadCompacted(ACTIVE_CONSUMER_UPDATER.get(this).readCompacted());
+        }
 
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 6094ab7..5cdbff1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -44,7 +44,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
                                                        NonPersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName(), subscription,
-                topic.getBrokerService().pulsar().getConfiguration());
+                topic.getBrokerService().pulsar().getConfiguration(), null);
         this.topic = topic;
         this.subscription = subscription;
         this.msgDrop = new Rate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 848cbb4..6161847 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -58,7 +58,6 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
         implements Dispatcher, ReadEntriesCallback {
 
     protected final PersistentTopic topic;
-    protected final ManagedCursor cursor;
     protected final String name;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
 
@@ -73,11 +72,10 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
     public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
                                                     PersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName(), subscription,
-                topic.getBrokerService().pulsar().getConfiguration());
+                topic.getBrokerService().pulsar().getConfiguration(), cursor);
         this.topic = topic;
         this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
                 : ""/* NonDurableCursor doesn't have name */);
-        this.cursor = cursor;
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.readFailureBackoff = new Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
             TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 608d99b..cbe7372 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -509,4 +509,77 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
         reader.close();
         producer.close();
     }
+
+    @Test
+    public void testReadCompactedDataWhenLedgerRolloverKickIn() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/testReadCompactedDataWhenLedgerRolloverKickIn-" +
+                UUID.randomUUID();
+        final int numMessages = 2000;
+        final int keys = 200;
+        final String msg = "Test";
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(numMessages)
+                .enableBatching(false)
+                .create();
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i % keys + "").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+        // Send more 200 keys
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key((i % keys + keys) + "").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+
+        // Make sure we have more than 1 original ledgers
+        admin.topics().unload(topic);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(admin.topics().getInternalStats(topic).ledgers.size(), 2);
+        });
+
+        // Start a new reader to reading messages
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .receiverQueueSize(10)
+                .create();
+
+        // Send more 200 keys
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key((i % keys + keys * 2) + "").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys * 3);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+
+        // The reader should read all 600 keys
+        int received = 0;
+        while (reader.hasMessageAvailable()) {
+            System.out.println(reader.readNext().getKey());
+            received++;
+        }
+        Assert.assertEquals(received, keys * 3);
+    }
 }

[pulsar] 06/11: [pulsar-broker] Add git branch information for PulsarVersion (#12541)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1aa80b7554672cd307e4d730ae946c954d1b695f
Author: litao <to...@gmail.com>
AuthorDate: Sat Nov 6 11:14:31 2021 +0800

    [pulsar-broker] Add git branch information for PulsarVersion (#12541)
    
    (cherry picked from commit bc8e40c7c26791ea5e030eb790318f8443014887)
---
 .../src/main/java/org/apache/pulsar/broker/PulsarService.java         | 1 +
 .../src/main/java-templates/org/apache/pulsar/PulsarVersion.java      | 4 ++++
 2 files changed, 5 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6b59862..28ccbb1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -582,6 +582,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         LOG.info("Starting Pulsar Broker service; version: '{}'",
                 (brokerVersion != null ? brokerVersion : "unknown"));
         LOG.info("Git Revision {}", PulsarVersion.getGitSha());
+        LOG.info("Git Branch {}", PulsarVersion.getGitBranch());
         LOG.info("Built by {} on {} at {}",
                 PulsarVersion.getBuildUser(),
                 PulsarVersion.getBuildHost(),
diff --git a/pulsar-common/src/main/java-templates/org/apache/pulsar/PulsarVersion.java b/pulsar-common/src/main/java-templates/org/apache/pulsar/PulsarVersion.java
index 07f97cd..e1e57e1 100644
--- a/pulsar-common/src/main/java-templates/org/apache/pulsar/PulsarVersion.java
+++ b/pulsar-common/src/main/java-templates/org/apache/pulsar/PulsarVersion.java
@@ -82,6 +82,10 @@ public class PulsarVersion {
         }
     }
 
+    public static String getGitBranch() {
+        return "${git.branch}";
+    }
+
     public static String getBuildUser() {
         String email = "${git.build.user.email}";
         String name = "${git.build.user.name}";

[pulsar] 10/11: Close Zk database on unit tests (#12649)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit af1c19430089cdd95535a8d3afe2affba7b84d5f
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Sat Nov 6 22:13:02 2021 +0800

    Close Zk database on unit tests (#12649)
    
    (cherry picked from commit 701df3206d274682d752d4e6405fb3ce65ffe197)
---
 .../src/test/java/org/apache/pulsar/metadata/TestZKServer.java           | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
index 4f83906..9e1f7bc 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
@@ -100,6 +100,7 @@ public class TestZKServer implements AutoCloseable {
     public void stop() throws Exception {
         if (zks != null) {
             zks.shutdown();
+            zks.getZKDatabase().close();
             zks = null;
         }