You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/05/25 06:56:03 UTC
[pulsar] branch branch-2.9 updated: [fix][txn] Topic transaction buffer recover don't close reader when throw RuntimeException (#15361)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 6fd31d101ba [fix][txn] Topic transaction buffer recover don't close reader when throw RuntimeException (#15361)
6fd31d101ba is described below
commit 6fd31d101bad252b512925ba733d8c636aa6307d
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue May 10 16:03:45 2022 +0800
[fix][txn] Topic transaction buffer recover don't close reader when throw RuntimeException (#15361)
Fixes: https://github.com/apache/pulsar/issues/14878
### Motivation
clear unuse reader in topicTransactionBufferSnapshot topic
When reader decode the Snapshot will throw RuntimeException not PulsarClientException
We should catch the Exception then close the reader and topic
```
"java.util.concurrent.CompletionException: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer.lambda$checkIfTBRecoverCompletely$3(TopicTransactionBuffer.java:232) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$1.recoverExceptionally(TopicTransactionBuffer.java:196) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$1(TopicTransactionBuffer.java:647) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) [?:?]
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) [?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722) [?:?]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[com.google.guava-guava-30.1-jre.jar:?]
at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
... 8 more
Caused by: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for 0
at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:129) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[com.google.guava-guava-30.1-jre.jar:?]
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[com.google.guava-guava-30.1-jre.jar:?]
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[com.google.guava-guava-30.1-jre.jar:?]
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[com.google.guava-guava-30.1-jre.jar:?]
at com.google.common.cache.LocalCache.get(LocalCache.java:3951) ~[com.google.guava-guava-30.1-jre.jar:?]
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[com.google.guava-guava-30.1-jre.jar:?]
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935) ~[com.google.guava-guava-30.1-jre.jar:?]
at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:484) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:462) ~[org.apache-pulsar-client-original-2.9.2.jar:2.9.2]
at org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer$TopicTransactionBufferRecover.lambda$null$0(TopicTransactionBuffer.java:583) ~[org.apache-pulsar-broker-2.9.2.jar:2.9.2]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
... 8 more
```
### Modifications
catch Exception then close the topic and reader
(cherry picked from commit 0c58810d29838a161481f03c14990d0eb021a185)
---
.../transaction/buffer/impl/TopicTransactionBuffer.java | 8 ++++----
.../broker/transaction/TopicTransactionBufferRecoverTest.java | 11 +++++++++--
2 files changed, 13 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index a348ccbb764..c889e006978 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -595,10 +595,10 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
callBack.noNeedToRecover();
return;
}
- } catch (PulsarClientException pulsarClientException) {
- log.error("[{}]Transaction buffer recover fail when read "
- + "transactionBufferSnapshot!", topic.getName(), pulsarClientException);
- callBack.recoverExceptionally(pulsarClientException);
+ } catch (Exception ex) {
+ log.error("[{}] Transaction buffer recover fail when read "
+ + "transactionBufferSnapshot!", topic.getName(), ex);
+ callBack.recoverExceptionally(ex);
closeReader(reader);
return;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index fe724dd2be7..dddda0f962d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -457,7 +457,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
@Test(timeOut=30000)
- public void testTransactionBufferRecoverThrowPulsarClientException() throws Exception {
+ public void testTransactionBufferRecoverThrowException() throws Exception {
String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowPulsarClientException";
@Cleanup
Producer<byte[]> producer = pulsarClient
@@ -491,7 +491,14 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
field.setAccessible(true);
TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal =
(TransactionBufferSnapshotService) field.get(getPulsarServiceList().get(0));
- // mock reader can't read snapshot fail
+ // mock reader can't read snapshot fail throw RuntimeException
+ doThrow(new RuntimeException("test")).when(reader).hasMoreEvents();
+ // check reader close topic
+ checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
+ transactionBufferSnapshotService, originalTopic, field, producer);
+ doReturn(true).when(reader).hasMoreEvents();
+
+ // mock reader can't read snapshot fail throw PulsarClientException
doThrow(new PulsarClientException("test")).when(reader).hasMoreEvents();
// check reader close topic
checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,