You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/23 19:20:25 UTC

[pulsar] branch branch-2.10 updated (b822d6f -> e2778b2)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from b822d6f  Revert "[PIP 97] Update Authentication Interfaces to Include Async Authentication Methods (#12104)" (#14424)
     new b90c87b  Fix TableViewImpl not retrying partition update on exceptions (#14408)
     new e2778b2  Fix Field 'consumer_epoch' is not set in ServerCnx (#14410)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/broker/service/ServerCnx.java    |  3 +-
 .../apache/pulsar/client/impl/TableViewTest.java   | 57 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/TableViewImpl.java   |  3 +-
 3 files changed, 61 insertions(+), 2 deletions(-)

[pulsar] 02/02: Fix Field 'consumer_epoch' is not set in ServerCnx (#14410)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e2778b26eff9134bcde1fbe2e84aaca86edfceb5
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Wed Feb 23 12:03:07 2022 +0800

    Fix Field 'consumer_epoch' is not set in ServerCnx (#14410)
    
    ### Motivation
    
    Test case `SimpleProducerConsumerTest#testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause` fails with debug log enabled.
    
    Root cause is that `redeliver.getConsumerEpoch()`  used in debug log without check if it's set.
    
    ```
    2022-02-22T13:13:09,216+0800 [pulsar-io-6-1] WARN  ServerCnx - [/127.0.0.1:64428] Got exception java.lang.IllegalStateException: Field 'consumer_epoch' is not set
    	at org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages.getConsumerEpoch(CommandRedeliverUnacknowledgedMessages.java:87)
    	at org.apache.pulsar.broker.service.ServerCnx.handleRedeliverUnacknowledged(ServerCnx.java:1559)
    	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:274)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
    	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
    	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    ```
    
    ### Modifications
    
    Add check before get.
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    This change is a trivial rework / code cleanup without any test coverage.
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API: (no)
      - The schema: (no)
      - The default values of configurations: (no)
      - The wire protocol: (no)
      - The rest endpoints: (no)
      - The admin cli options: (no)
      - Anything that affects deployment: (no)
    
    ### Documentation
    
    Check the box below and label this PR (if you have committer privilege).
    
    Need to update docs?
    
    - [x] `no-need-doc`
---
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java      | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4ad42c0..f216902 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1556,7 +1556,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         checkArgument(state == State.Connected);
         if (log.isDebugEnabled()) {
             log.debug("[{}] redeliverUnacknowledged from consumer {}, consumerEpoch {}",
-                    remoteAddress, redeliver.getConsumerId(), redeliver.getConsumerEpoch());
+                    remoteAddress, redeliver.getConsumerId(),
+                    redeliver.hasConsumerEpoch() ? redeliver.getConsumerEpoch() : null);
         }
 
         CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId());

[pulsar] 01/02: Fix TableViewImpl not retrying partition update on exceptions (#14408)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b90c87b72d5ec9a6d3f05ead716dc42b804f799b
Author: Ziyao Wei <zi...@gmail.com>
AuthorDate: Wed Feb 23 14:03:42 2022 -0500

    Fix TableViewImpl not retrying partition update on exceptions (#14408)
    
    * Fix TableViewImpl not retrying partition update on exceptions
    
    * Log exception and undo whitespace changes
    
    * Change topic name in unit test
---
 .../apache/pulsar/client/impl/TableViewTest.java   | 57 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/TableViewImpl.java   |  3 +-
 2 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 9d235ee..4f0b704 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -30,12 +30,16 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -158,4 +162,57 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
         });
         Assert.assertEquals(tv.keySet(), keys2);
     }
+
+
+    @Test(timeOut = 30 * 1000)
+    // Regression test for making sure partition changes are always periodically checked even after a check returned
+    // exceptionally.
+    public void testTableViewUpdatePartitionsTriggeredDespiteExceptions() throws Exception {
+        String topic = "persistent://public/default/tableview-test-update-partitions-triggered-despite-exceptions";
+        admin.topics().createPartitionedTopic(topic, 3);
+        int count = 20;
+        Set<String> keys = this.publishMessages(topic, count, false);
+        PulsarClient spyPulsarClient = Mockito.spy(pulsarClient);
+        @Cleanup
+        TableView<byte[]> tv = spyPulsarClient.newTableViewBuilder(Schema.BYTES)
+                .topic(topic)
+                .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+                .create();
+        log.info("start tv size: {}", tv.size());
+        tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
+        Awaitility.await().untilAsserted(() -> {
+            log.info("Current tv size: {}", tv.size());
+            Assert.assertEquals(tv.size(), count);
+        });
+        Assert.assertEquals(tv.keySet(), keys);
+        tv.forEachAndListen((k, v) -> log.info("checkpoint {} -> {}", k, new String(v)));
+
+        // Let update partition check throw an exception
+        Mockito.doReturn(FutureUtil.failedFuture(new PulsarClientException("")))
+                .when(spyPulsarClient)
+                .getPartitionsForTopic(Mockito.any());
+
+        admin.topics().updatePartitionedTopic(topic, 4);
+        TopicName topicName = TopicName.get(topic);
+
+        // Make sure the get partitions callback is called; it should throw an exception
+        Mockito.verify(spyPulsarClient).getPartitionsForTopic(Mockito.any());
+
+        // Send more data to partition 3, which is not in the current TableView, need update partitions
+        Set<String> keys2 =
+                this.publishMessages(topicName.getPartition(3).toString(), count * 2, false);
+
+        // Wait for 10 seconds; verify that the messages haven't arrived, which would have happened if the partitions
+        // has been updated
+        TimeUnit.SECONDS.sleep(10);
+        Assert.assertEquals(tv.size(), count);
+
+        // Let update partition check succeed, and check the messages eventually arrives
+        Mockito.doCallRealMethod().when(spyPulsarClient).getPartitionsForTopic(Mockito.any());
+        Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
+            log.info("Current tv size: {}", tv.size());
+            Assert.assertEquals(tv.size(), count * 2);
+        });
+        Assert.assertEquals(tv.keySet(), keys2);
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index ab9bf11..e767156 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -105,7 +105,8 @@ public class TableViewImpl<T> implements TableView<T> {
 
         start().whenComplete((tw, ex) -> {
            if (ex != null) {
-               log.warn("Failed to check for changes in number of partitions");
+               log.warn("Failed to check for changes in number of partitions: {}", ex);
+               schedulePartitionsCheck();
            }
         });
     }