You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2023/02/09 12:46:26 UTC
[pulsar] branch branch-2.11 updated: [fix][txn] Always send correct transaction id in end txn response (#19137)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new a835a845db2 [fix][txn] Always send correct transaction id in end txn response (#19137)
a835a845db2 is described below
commit a835a845db24309721f91b1db2bb6f59078a9346
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Tue Jan 10 07:14:47 2023 +0100
[fix][txn] Always send correct transaction id in end txn response (#19137)
(cherry picked from commit 90764e7a03034589764343202196508e7950584e)
---
.../pulsar/broker/service/PulsarCommandSender.java | 2 +-
.../broker/service/PulsarCommandSenderImpl.java | 7 +-
.../apache/pulsar/broker/service/ServerCnx.java | 44 ++--
.../pulsar/broker/service/ServerCnxTest.java | 285 ++++++++++++++++++++-
.../broker/service/utils/ClientChannelHelper.java | 35 ++-
.../client/impl/TransactionMetaStoreHandler.java | 62 +++--
.../apache/pulsar/common/protocol/Commands.java | 16 +-
7 files changed, 383 insertions(+), 68 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index dc5b97d846f..95c54718c36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -88,7 +88,7 @@ public interface PulsarCommandSender {
void sendNewTxnResponse(long requestId, TxnID txnID, long tcID);
- void sendNewTxnErrorResponse(long requestId, long txnID, ServerError error, String message);
+ void sendNewTxnErrorResponse(long requestId, long tcID, ServerError error, String message);
void sendEndTxnResponse(long requestId, TxnID txnID, int txnAction);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index e61742668a9..8dd5a451b2b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -332,8 +332,8 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
}
@Override
- public void sendNewTxnErrorResponse(long requestId, long txnID, ServerError error, String message) {
- BaseCommand command = Commands.newTxnResponse(requestId, txnID, error, message);
+ public void sendNewTxnErrorResponse(long requestId, long tcID, ServerError error, String message) {
+ BaseCommand command = Commands.newTxnResponse(requestId, tcID, error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
@@ -353,7 +353,8 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
@Override
public void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError error, String message) {
- BaseCommand command = Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(), error, message);
+ BaseCommand command = Commands.newEndTxnResponse(requestId, txnID.getLeastSigBits(),
+ txnID.getMostSigBits(), error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7e6600eb197..cc7c67e8cd5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2367,7 +2367,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
} else {
ex = handleTxnException(ex, BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
- ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
+ ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
+ txnID.getLeastSigBits(),
+ txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
@@ -2412,7 +2414,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
final long requestId = command.getRequestId();
final String topic = command.getTopic();
final int txnAction = command.getTxnAction().getValue();
- TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+ final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
if (log.isDebugEnabled()) {
@@ -2448,7 +2450,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
ServerError.ServiceNotReady,
"The topic " + topic + " does not exist in broker.",
- txnID.getMostSigBits(), txnID.getLeastSigBits()));
+ txnID.getLeastSigBits(), txnID.getMostSigBits()));
} else {
log.warn("handleEndTxnOnPartition fail ! The topic {} has not been created, "
+ "txnId: [{}], txnAction: [{}]",
@@ -2457,13 +2459,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
txnID.getLeastSigBits(), txnID.getMostSigBits()));
}
}).exceptionally(e -> {
- log.error("handleEndTxnOnPartition fail ! topic {}, "
- + "txnId: [{}], txnAction: [{}]", topic, txnID,
- TxnAction.valueOf(txnAction), e.getCause());
- ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
- requestId, ServerError.ServiceNotReady,
- e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
- return null;
+ log.error("handleEndTxnOnPartition fail ! topic {}, "
+ + "txnId: [{}], txnAction: [{}]", topic, txnID,
+ TxnAction.valueOf(txnAction), e.getCause());
+ ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+ requestId, ServerError.ServiceNotReady,
+ e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
+ return null;
});
}
}).exceptionally(e -> {
@@ -2517,7 +2519,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
BrokerServiceException.getClientErrorCode(e),
- "Handle end txn on subscription failed."));
+ "Handle end txn on subscription failed: " + e.getMessage()));
return;
}
ctx.writeAndFlush(
@@ -2530,7 +2532,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
if (b) {
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, "
+ "subscription: {}, txnId: [{}], txnAction: [{}]", topic, subName,
- new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
+ txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
ServerError.ServiceNotReady,
@@ -2543,13 +2545,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
txnID.getLeastSigBits(), txnID.getMostSigBits()));
}
}).exceptionally(e -> {
- log.error("handleEndTxnOnSubscription fail ! topic {}, subscription: {}"
- + "txnId: [{}], txnAction: [{}]", topic, subName,
- txnID, TxnAction.valueOf(txnAction), e.getCause());
- ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
- requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
- ServerError.ServiceNotReady, e.getMessage()));
- return null;
+ log.error("handleEndTxnOnSubscription fail ! topic {}, subscription: {}"
+ + "txnId: [{}], txnAction: [{}]", topic, subName,
+ txnID, TxnAction.valueOf(txnAction), e.getCause());
+ ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+ requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
+ ServerError.ServiceNotReady, e.getMessage()));
+ return null;
});
}
}).exceptionally(e -> {
@@ -2559,7 +2561,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
- "Handle end txn on subscription failed."));
+ "Handle end txn on subscription failed: " + e.getMessage()));
return null;
});
}
@@ -2616,7 +2618,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
} else {
ex = handleTxnException(ex, BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
- ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+ ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 0c916988662..a8dcad16029 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -22,6 +22,10 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doAnswer;
@@ -75,13 +79,13 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -91,14 +95,20 @@ import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnected;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
@@ -114,6 +124,8 @@ import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -151,7 +163,6 @@ public class ServerCnxTest {
private ClientChannelHelper clientChannelHelper;
private PulsarService pulsar;
private MetadataStoreExtended store;
- private ConfigurationCacheService configCacheService;
private NamespaceResources namespaceResources;
protected NamespaceService namespaceService;
private final int currentProtocolVersion = ProtocolVersion.values()[ProtocolVersion.values().length - 1]
@@ -2154,7 +2165,7 @@ public class ServerCnxTest {
return false;
}
};
- Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+ Mockito.when(consumers.putIfAbsent(anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
// do test: delay complete after execute 'isDone()' many times
// Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) {
@@ -2193,12 +2204,12 @@ public class ServerCnxTest {
}
// case3: exists existingConsumerFuture, already complete and exception
CompletableFuture existingConsumerFuture = Mockito.mock(CompletableFuture.class);
- Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+ Mockito.when(consumers.putIfAbsent(anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
// make consumerFuture delay finish
Mockito.when(existingConsumerFuture.isDone()).thenReturn(true);
// when sync get return, future will return success value.
Mockito.when(existingConsumerFuture.get()).thenThrow(new NullPointerException());
- Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), Mockito.any())).
+ Mockito.when(existingConsumerFuture.get(anyLong(), Mockito.any())).
thenThrow(new NullPointerException());
Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true);
Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new NullPointerException());
@@ -2421,4 +2432,268 @@ public class ServerCnxTest {
assertEquals(((CommandPartitionedTopicMetadataResponse) response).getError(), ServerError.ServiceNotReady);
channel.finish();
}
+
+ @Test(timeOut = 30000)
+ public void sendAddPartitionToTxnResponse() throws Exception {
+ final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+ when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L,
+ List.of("tenant/ns/topic1"));
+ channel.writeInbound(clientCommand);
+ CommandAddPartitionToTxnResponse response = (CommandAddPartitionToTxnResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertFalse(response.hasError());
+ assertFalse(response.hasMessage());
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendAddPartitionToTxnResponseFailed() throws Exception {
+ final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+ when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
+ .thenReturn(CompletableFuture.failedFuture(new RuntimeException("server error")));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L,
+ List.of("tenant/ns/topic1"));
+ channel.writeInbound(clientCommand);
+ CommandAddPartitionToTxnResponse response = (CommandAddPartitionToTxnResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertEquals(response.getError().getValue(), 0);
+ assertEquals(response.getMessage(), "server error");
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendAddSubscriptionToTxnResponse() throws Exception {
+ final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+ when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ final Subscription sub = new Subscription();
+ sub.setTopic("topic1");
+ sub.setSubscription("sub1");
+ ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L,
+ List.of(sub));
+ channel.writeInbound(clientCommand);
+ CommandAddSubscriptionToTxnResponse response = (CommandAddSubscriptionToTxnResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertFalse(response.hasError());
+ assertFalse(response.hasMessage());
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendAddSubscriptionToTxnResponseFailed() throws Exception {
+ final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+ when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
+ .thenReturn(CompletableFuture.failedFuture(new RuntimeException("server error")));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ final Subscription sub = new Subscription();
+ sub.setTopic("topic1");
+ sub.setSubscription("sub1");
+ ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L,
+ List.of(sub));
+ channel.writeInbound(clientCommand);
+ CommandAddSubscriptionToTxnResponse response = (CommandAddSubscriptionToTxnResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertEquals(response.getError().getValue(), 0);
+ assertEquals(response.getMessage(), "server error");
+
+ channel.finish();
+ }
+
+
+ @Test(timeOut = 30000)
+ public void sendEndTxnResponse() throws Exception {
+ final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+ when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L,
+ TxnAction.COMMIT));
+ channel.writeInbound(clientCommand);
+ CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertFalse(response.hasError());
+ assertFalse(response.hasMessage());
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendEndTxnResponseFailed() throws Exception {
+ final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+ when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+ .thenReturn(CompletableFuture.failedFuture(new RuntimeException("server error")));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L,
+ TxnAction.COMMIT));
+ channel.writeInbound(clientCommand);
+ CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertEquals(response.getError().getValue(), 0);
+ assertEquals(response.getMessage(), "server error");
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendEndTxnOnPartitionResponse() throws Exception {
+ final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+ when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ Topic topic = mock(Topic.class);
+ doReturn(CompletableFuture.completedFuture(null)).when(topic).endTxn(any(TxnID.class), anyInt(), anyLong());
+ doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+ ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
+ successTopicName, TxnAction.COMMIT, 1L);
+ channel.writeInbound(clientCommand);
+ CommandEndTxnOnPartitionResponse response = (CommandEndTxnOnPartitionResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertFalse(response.hasError());
+ assertFalse(response.hasMessage());
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendEndTxnOnPartitionResponseFailed() throws Exception {
+ final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+ when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ Topic topic = mock(Topic.class);
+ doReturn(CompletableFuture.failedFuture(new RuntimeException("server error"))).when(topic).endTxn(any(TxnID.class), anyInt(), anyLong());
+ doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+ ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
+ successTopicName, TxnAction.COMMIT, 1L);
+ channel.writeInbound(clientCommand);
+ CommandEndTxnOnPartitionResponse response = (CommandEndTxnOnPartitionResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertEquals(response.getError().getValue(), 0);
+ assertEquals(response.getMessage(), "server error");
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendEndTxnOnSubscription() throws Exception {
+ final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+ when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ Topic topic = mock(Topic.class);
+ final org.apache.pulsar.broker.service.Subscription sub = mock(org.apache.pulsar.broker.service.Subscription.class);
+ doReturn(sub).when(topic).getSubscription(any());
+ doReturn(CompletableFuture.completedFuture(null))
+ .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
+ doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+
+ ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
+ successTopicName, successSubName, TxnAction.COMMIT, 1L);
+ channel.writeInbound(clientCommand);
+ CommandEndTxnOnSubscriptionResponse response = (CommandEndTxnOnSubscriptionResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertFalse(response.hasError());
+ assertFalse(response.hasMessage());
+
+ channel.finish();
+ }
+
+
+ @Test(timeOut = 30000)
+ public void sendEndTxnOnSubscriptionFailed() throws Exception {
+ final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+ when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ Topic topic = mock(Topic.class);
+
+ final org.apache.pulsar.broker.service.Subscription sub = mock(org.apache.pulsar.broker.service.Subscription.class);
+ doReturn(sub).when(topic).getSubscription(any());
+ doReturn(CompletableFuture.failedFuture(new RuntimeException("server error")))
+ .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
+ doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+
+ ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
+ successTopicName, successSubName, TxnAction.COMMIT, 1L);
+ channel.writeInbound(clientCommand);
+ CommandEndTxnOnSubscriptionResponse response = (CommandEndTxnOnSubscriptionResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertEquals(response.getError().getValue(), 0);
+ assertEquals(response.getMessage(), "Handle end txn on subscription failed: server error");
+
+ channel.finish();
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index afd071a858b..f33b0fb093c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -19,7 +19,11 @@
package org.apache.pulsar.broker.service.utils;
import java.util.Queue;
-
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
@@ -168,6 +172,35 @@ public class ClientChannelHelper {
protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse response) {
queue.offer(new CommandPartitionedTopicMetadataResponse().copyFrom(response));
}
+
+ @Override
+ protected void handleAddPartitionToTxnResponse(
+ CommandAddPartitionToTxnResponse commandAddPartitionToTxnResponse) {
+ queue.offer(new CommandAddPartitionToTxnResponse().copyFrom(commandAddPartitionToTxnResponse));
+ }
+
+ @Override
+ protected void handleAddSubscriptionToTxnResponse(
+ CommandAddSubscriptionToTxnResponse commandAddSubscriptionToTxnResponse) {
+ queue.offer(new CommandAddSubscriptionToTxnResponse().copyFrom(commandAddSubscriptionToTxnResponse));
+ }
+
+ @Override
+ protected void handleEndTxnResponse(CommandEndTxnResponse commandEndTxnResponse) {
+ queue.offer(new CommandEndTxnResponse().copyFrom(commandEndTxnResponse));
+ }
+
+ @Override
+ protected void handleEndTxnOnPartitionResponse(
+ CommandEndTxnOnPartitionResponse commandEndTxnOnPartitionResponse) {
+ queue.offer(new CommandEndTxnOnPartitionResponse().copyFrom(commandEndTxnOnPartitionResponse));
+ }
+
+ @Override
+ protected void handleEndTxnOnSubscriptionResponse(
+ CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) {
+ queue.offer(new CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse));
+ }
};
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index baff5deb3f1..15533cfd2a9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -217,9 +217,9 @@ public class TransactionMetaStoreHandler extends HandlerState
}
void handleNewTxnResponse(CommandNewTxnResponse response) {
- boolean hasError = response.hasError();
- ServerError error;
- String message;
+ final boolean hasError = response.hasError();
+ final ServerError error;
+ final String message;
if (hasError) {
error = response.getError();
message = response.getMessage();
@@ -227,14 +227,13 @@ public class TransactionMetaStoreHandler extends HandlerState
error = null;
message = null;
}
- TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
- long requestId = response.getRequestId();
+ final TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
+ final long requestId = response.getRequestId();
internalPinnedExecutor.execute(() -> {
OpForTxnIdCallBack op = (OpForTxnIdCallBack) pendingRequests.remove(requestId);
if (op == null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Got new txn response for timeout {} - {}", txnID.getMostSigBits(),
- txnID.getLeastSigBits());
+ LOG.debug("Got new txn response for transaction {}", txnID);
}
return;
}
@@ -301,9 +300,9 @@ public class TransactionMetaStoreHandler extends HandlerState
}
void handleAddPublishPartitionToTxnResponse(CommandAddPartitionToTxnResponse response) {
- boolean hasError = response.hasError();
- ServerError error;
- String message;
+ final boolean hasError = response.hasError();
+ final ServerError error;
+ final String message;
if (hasError) {
error = response.getError();
message = response.getMessage();
@@ -311,14 +310,13 @@ public class TransactionMetaStoreHandler extends HandlerState
error = null;
message = null;
}
- TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
- long requestId = response.getRequestId();
+ final TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
+ final long requestId = response.getRequestId();
internalPinnedExecutor.execute(() -> {
OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(requestId);
if (op == null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Got add publish partition to txn response for timeout {} - {}", txnID.getMostSigBits(),
- txnID.getLeastSigBits());
+ LOG.debug("Got add publish partition to txn response for transaction {}", txnID);
}
return;
}
@@ -352,8 +350,8 @@ public class TransactionMetaStoreHandler extends HandlerState
, op.backoff.next(), TimeUnit.MILLISECONDS);
return;
}
- LOG.error("{} for request {} error {} with txnID {}.", BaseCommand.Type.ADD_PARTITION_TO_TXN.name(),
- requestId, error, txnID);
+ LOG.error("{} for request {}, transaction {}, error: {}",
+ BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId, txnID, error);
}
@@ -385,9 +383,9 @@ public class TransactionMetaStoreHandler extends HandlerState
}
public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnResponse response) {
- boolean hasError = response.hasError();
- ServerError error;
- String message;
+ final boolean hasError = response.hasError();
+ final ServerError error;
+ final String message;
if (hasError) {
error = response.getError();
message = response.getMessage();
@@ -395,7 +393,8 @@ public class TransactionMetaStoreHandler extends HandlerState
error = null;
message = null;
}
- long requestId = response.getRequestId();
+ final long requestId = response.getRequestId();
+ final TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
internalPinnedExecutor.execute(() -> {
OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(requestId);
if (op == null) {
@@ -411,8 +410,8 @@ public class TransactionMetaStoreHandler extends HandlerState
}
op.callback.complete(null);
} else {
- LOG.error("Add subscription to txn failed for request {} error {}.",
- requestId, error);
+ LOG.error("Add subscription to txn failed for request {}, transaction {}, error: {}",
+ requestId, txnID, error);
if (checkIfNeedRetryByError(error, message, op)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Get a response for {} request {} error TransactionCoordinatorNotFound and try it"
@@ -466,9 +465,9 @@ public class TransactionMetaStoreHandler extends HandlerState
}
void handleEndTxnResponse(CommandEndTxnResponse response) {
- boolean hasError = response.hasError();
- ServerError error;
- String message;
+ final boolean hasError = response.hasError();
+ final ServerError error;
+ final String message;
if (hasError) {
error = response.getError();
message = response.getMessage();
@@ -476,21 +475,20 @@ public class TransactionMetaStoreHandler extends HandlerState
error = null;
message = null;
}
- TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
- long requestId = response.getRequestId();
+ final TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
+ final long requestId = response.getRequestId();
internalPinnedExecutor.execute(() -> {
OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(requestId);
if (op == null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Got end txn response for timeout {} - {}", txnID.getMostSigBits(),
- txnID.getLeastSigBits());
+ LOG.debug("Got end txn response for transaction but no requests pending for txn {}", txnID);
}
return;
}
if (!hasError) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Got end txn response success for request {}", requestId);
+ LOG.debug("Got end txn response success for request {}, txn {}", requestId, txnID);
}
op.callback.complete(null);
} else {
@@ -517,8 +515,8 @@ public class TransactionMetaStoreHandler extends HandlerState
, op.backoff.next(), TimeUnit.MILLISECONDS);
return;
}
- LOG.error("Got {} response for request {} error {}", BaseCommand.Type.END_TXN.name(),
- requestId, error);
+ LOG.error("Got {} response for request {}, transaction {}, error: {}",
+ BaseCommand.Type.END_TXN.name(), requestId, txnID, error);
}
onResponse(op);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 61ce70a9a0a..99b229d500e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1344,12 +1344,16 @@ public class Commands {
return serializeWithSize(cmd);
}
- public static ByteBuf newAddPartitionToTxnResponse(long requestId, long txnIdMostBits, ServerError error,
- String errorMsg) {
+ public static ByteBuf newAddPartitionToTxnResponse(long requestId,
+ long txnIdLeastBits,
+ long txnIdMostBits,
+ ServerError error,
+ String errorMsg) {
BaseCommand cmd = localCmd(Type.ADD_PARTITION_TO_TXN_RESPONSE);
CommandAddPartitionToTxnResponse response = cmd.setAddPartitionToTxnResponse()
.setRequestId(requestId)
.setError(error)
+ .setTxnidLeastBits(txnIdLeastBits)
.setTxnidMostBits(txnIdMostBits);
if (errorMsg != null) {
@@ -1378,12 +1382,13 @@ public class Commands {
return serializeWithSize(cmd);
}
- public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long txnIdMostBits, ServerError error,
- String errorMsg) {
+ public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long txnIdLeastBits,
+ long txnIdMostBits, ServerError error, String errorMsg) {
BaseCommand cmd = localCmd(Type.ADD_SUBSCRIPTION_TO_TXN_RESPONSE);
CommandAddSubscriptionToTxnResponse response = cmd.setAddSubscriptionToTxnResponse()
.setRequestId(requestId)
.setTxnidMostBits(txnIdMostBits)
+ .setTxnidLeastBits(txnIdLeastBits)
.setError(error);
if (errorMsg != null) {
response.setMessage(errorMsg);
@@ -1409,11 +1414,12 @@ public class Commands {
return cmd;
}
- public static BaseCommand newEndTxnResponse(long requestId, long txnIdMostBits,
+ public static BaseCommand newEndTxnResponse(long requestId, long txnIdLeastBits, long txnIdMostBits,
ServerError error, String errorMsg) {
BaseCommand cmd = localCmd(Type.END_TXN_RESPONSE);
CommandEndTxnResponse response = cmd.setEndTxnResponse()
.setRequestId(requestId)
+ .setTxnidLeastBits(txnIdLeastBits)
.setTxnidMostBits(txnIdMostBits)
.setError(error);
if (errorMsg != null) {