You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/09/03 01:00:52 UTC

[pulsar] branch branch-2.8 updated (0448c27 -> cc09a38)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 0448c27  Fix cherry-pick issue after cherry-pick #11294
     new f9b02ca  [Issue 11814] fix pulsar admin method:getMessageById. (#11852)
     new cc09a38  Fix the Pulsar Proxy flaky test. (#11900)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build/run_unit_group.sh                            | 14 ++++------
 .../broker/admin/impl/PersistentTopicsBase.java    |  7 +++++
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 32 ++++++++++++++++++++++
 3 files changed, 45 insertions(+), 8 deletions(-)

[pulsar] 01/02: [Issue 11814] fix pulsar admin method:getMessageById. (#11852)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f9b02cae83fb1999ce381663e084a4e8de0cd1e8
Author: Hao Zhang <zh...@cmss.chinamobile.com>
AuthorDate: Thu Sep 2 09:49:05 2021 +0800

    [Issue 11814] fix pulsar admin method:getMessageById. (#11852)
    
    Fix https://github.com/apache/pulsar/issues/11814 , if we use another topic to find the message, it will return the message, but we may contaminate the ledgers cache in the topic.
    
    **changes**
    Add check in the method 'internalGetMessageById' in PersistentTopicsBase, if the ledgerId not belong to this topic, throw a exception.
    
    (cherry picked from commit 9bfb3dba7c13e8250e1002efdeb39eec56f7e2da)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  7 +++++
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 32 ++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f63d318..c7edc77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2298,6 +2298,13 @@ public class PersistentTopicsBase extends AdminResource {
             }
             PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
             ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
+            if (null == ledger.getLedgerInfo(ledgerId).get()) {
+                log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}, "
+                                + "the ledgerId does not belong to this topic.",
+                        clientAppId(), ledgerId, entryId, topicName);
+                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                        "Message not found, the ledgerId does not belong to this topic"));
+            }
             ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() {
                 @Override
                 public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index ad37635..dd74400 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.ProducerBase;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -857,4 +858,35 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         verify(response, timeout(10000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
     }
+
+    public void testGetMessageById() throws Exception {
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("tenant-xyz", tenantInfo);
+        admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test"));
+        final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1";
+        final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2";
+        admin.topics().createNonPartitionedTopic(topicName1);
+        admin.topics().createNonPartitionedTopic(topicName2);
+        ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName1)
+                .enableBatching(false).create();
+        String data1 = "test1";
+        MessageIdImpl id1 = (MessageIdImpl) producer1.send(data1.getBytes());
+
+        ProducerBase<byte[]> producer2 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName2)
+                .enableBatching(false).create();
+        String data2 = "test2";
+        MessageIdImpl id2 = (MessageIdImpl) producer2.send(data2.getBytes());
+
+        Message<byte[]> message1 = admin.topics().getMessageById(topicName1, id1.getLedgerId(), id1.getEntryId());
+        Assert.assertEquals(message1.getData(), data1.getBytes());
+
+        Message<byte[]> message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId());
+        Assert.assertEquals(message2.getData(), data2.getBytes());
+
+        Message<byte[]> message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
+        Assert.assertNull(message3);
+
+        Message<byte[]> message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
+        Assert.assertNull(message4);
+    }
 }

[pulsar] 02/02: Fix the Pulsar Proxy flaky test. (#11900)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cc09a387eda9a164996a655421dae1bf40039485
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Sep 3 00:52:20 2021 +0800

    Fix the Pulsar Proxy flaky test. (#11900)
    
    ```
    Error:  Tests run: 6, Failures: 3, Errors: 0, Skipped: 3, Time elapsed: 13.646 s <<< FAILURE! - in org.apache.pulsar.proxy.server.ProxyServiceStarterTest
      Error:  setup(org.apache.pulsar.proxy.server.ProxyServiceStarterTest)  Time elapsed: 2.323 s  <<< FAILURE!
      java.lang.IllegalArgumentException: Collector already registered that provides name: jvm_memory_direct_bytes_used
      	at io.prometheus.client.CollectorRegistry.register(CollectorRegistry.java:54)
      	at io.prometheus.client.Collector.register(Collector.java:139)
      	at org.apache.pulsar.proxy.server.ProxyServiceStarter.start(ProxyServiceStarter.java:183)
      	at org.apache.pulsar.proxy.server.ProxyServiceStarterTest.setup(ProxyServiceStarterTest.java:64)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
      	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
      	at org.testng.internal.MethodInvocationHelper.invokeMethodConsideringTimeout(MethodInvocationHelper.java:61)
      	at org.testng.internal.ConfigInvoker.invokeConfigurationMethod(ConfigInvoker.java:366)
      	at org.testng.internal.ConfigInvoker.invokeConfigurations(ConfigInvoker.java:320)
      	at org.testng.internal.TestMethodWorker.invokeBeforeClassMethods(TestMethodWorker.java:176)
      	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:122)
      	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
      	at org.testng.TestRunner.privateRun(TestRunner.java:764)
      	at org.testng.TestRunner.run(TestRunner.java:585)
      	at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
      	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
      	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
      	at org.testng.SuiteRunner.run(SuiteRunner.java:286)
      	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
      	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
      	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
      	at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
      	at org.testng.TestNG.runSuites(TestNG.java:1069)
      	at org.testng.TestNG.run(TestNG.java:1037)
      	at org.apache.maven.surefire.testng.TestNGExecutor.run(TestNGExecutor.java:135)
      	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeSingleClass(TestNGDirectoryTestSuite.java:112)
      	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeLazy(TestNGDirectoryTestSuite.java:123)
      	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.execute(TestNGDirectoryTestSuite.java:90)
      	at org.apache.maven.surefire.testng.TestNGProvider.invoke(TestNGProvider.java:146)
      	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
      	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
      	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
      	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
    ```
    
    (cherry picked from commit df9ae16cffaaf8699333f97c0114ee243d292827)
---
 build/run_unit_group.sh | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 449e91b..77082cf 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -103,14 +103,12 @@ function broker_flaky() {
 }
 
 function proxy() {
-  echo "::endgroup::"
-  echo "::group::Running quarantined pulsar-proxy tests"
-  $MVN_COMMAND test -pl pulsar-proxy -Dgroups='quarantine' -DexcludedGroups='' -DfailIfNoTests=false ||
-    print_testng_failures pulsar-proxy/target/surefire-reports/testng-failed.xml "Quarantined test failure in" "Quarantined test failures"
-  echo "::endgroup::"
-  echo "::group::Running pulsar-proxy tests"
-  $MVN_TEST_COMMAND -pl pulsar-proxy
-  echo "::endgroup::"
+    echo "::group::Running pulsar-proxy tests"
+    $MVN_TEST_COMMAND -pl pulsar-proxy -Dtest="org.apache.pulsar.proxy.server.ProxyServiceTlsStarterTest"
+    $MVN_TEST_COMMAND -pl pulsar-proxy -Dtest="org.apache.pulsar.proxy.server.ProxyServiceStarterTest"
+    $MVN_TEST_COMMAND -pl pulsar-proxy -Dexclude='org.apache.pulsar.proxy.server.ProxyServiceTlsStarterTest,
+                                                  org.apache.pulsar.proxy.server.ProxyServiceStarterTest'
+    echo "::endgroup::"
 }
 
 function other() {