You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/05/25 06:56:03 UTC

[pulsar] branch branch-2.9 updated: [fix][txn] Topic transaction buffer recover don't close reader when throw RuntimeException (#15361)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 6fd31d101ba [fix][txn] Topic transaction buffer recover don't close reader when throw RuntimeException (#15361)
6fd31d101ba is described below

commit 6fd31d101bad252b512925ba733d8c636aa6307d
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue May 10 16:03:45 2022 +0800

    [fix][txn] Topic transaction buffer recover don't close reader when throw RuntimeException (#15361)
    
    Fixes: https://github.com/apache/pulsar/issues/14878
    
    ### Motivation
    clear unuse reader in topicTransactionBufferSnapshot topic
    
    When reader decode the Snapshot will throw RuntimeException not PulsarClientException
    
    We should catch the Exception then close the reader and topic
    
    ```
    "java.util.concurrent.CompletionException: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
            at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
            at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
            at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
            at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
            at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
            at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
            at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
            at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) [?:?]
            at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) [?:?]
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
            at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
            at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) [?:?]
            at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
            at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
            at java.lang.Thread.run(Thread.java:829) [?:?]
    Caused by: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
            at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[com.google.guava-guava-30.1-jre.jar:?]
            at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
            at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
            at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
            at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
            at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
            ... 8 more
    Caused by: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
            at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[com.google.guava-guava-30.1-jre.jar:?]
            at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[com.google.guava-guava-30.1-jre.jar:?]
            at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[com.google.guava-guava-30.1-jre.jar:?]
            at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[com.google.guava-guava-30.1-jre.jar:?]
            at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
            at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
            at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
            at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
            at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
            at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
            ... 8 more
    ```
    
    ### Modifications
    catch Exception then close the topic and reader
    
    (cherry picked from commit 0c58810d29838a161481f03c14990d0eb021a185)
---
 .../transaction/buffer/impl/TopicTransactionBuffer.java       |  8 ++++----
 .../broker/transaction/TopicTransactionBufferRecoverTest.java | 11 +++++++++--
 2 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index a348ccbb764..c889e006978 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -595,10 +595,10 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                                     callBack.noNeedToRecover();
                                     return;
                                 }
-                            } catch (PulsarClientException pulsarClientException) {
-                                log.error("[{}]Transaction buffer recover fail when read "
-                                        + "transactionBufferSnapshot!", topic.getName(), pulsarClientException);
-                                callBack.recoverExceptionally(pulsarClientException);
+                            } catch (Exception ex) {
+                                log.error("[{}] Transaction buffer recover fail when read "
+                                        + "transactionBufferSnapshot!", topic.getName(), ex);
+                                callBack.recoverExceptionally(ex);
                                 closeReader(reader);
                                 return;
                             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index fe724dd2be7..dddda0f962d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -457,7 +457,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
 
     @Test(timeOut=30000)
-    public void testTransactionBufferRecoverThrowPulsarClientException() throws Exception {
+    public void testTransactionBufferRecoverThrowException() throws Exception {
         String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowPulsarClientException";
         @Cleanup
         Producer<byte[]> producer = pulsarClient
@@ -491,7 +491,14 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         field.setAccessible(true);
         TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal =
                 (TransactionBufferSnapshotService) field.get(getPulsarServiceList().get(0));
-        // mock reader can't read snapshot fail
+        // mock reader can't read snapshot fail throw RuntimeException
+        doThrow(new RuntimeException("test")).when(reader).hasMoreEvents();
+        // check reader close topic
+        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
+                transactionBufferSnapshotService, originalTopic, field, producer);
+        doReturn(true).when(reader).hasMoreEvents();
+
+        // mock reader can't read snapshot fail throw PulsarClientException
         doThrow(new PulsarClientException("test")).when(reader).hasMoreEvents();
         // check reader close topic
         checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,