You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 11:17:36 UTC
[pulsar] branch branch-2.8 updated: [Java Client] Use epoch to version producer's cnx to prevent early de… (#12779)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 82a4d31 [Java Client] Use epoch to version producer's cnx to prevent early de… (#12779)
82a4d31 is described below
commit 82a4d3169fc68e4ac0c912b0314b88aac2377f78
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Dec 9 13:00:44 2021 +0200
[Java Client] Use epoch to version producer's cnx to prevent early de… (#12779)
* [Java Client] Use epoch to version producer's cnx to prevent early delivery of messages
* Update initial epoch value: it now gets incremented before the first connection
(cherry picked from commit ab652b8dbaecbb59e4a20e5b1f5ce93147502b24)
---
.../pulsar/client/impl/ConnectionHandler.java | 20 ++++++----
.../apache/pulsar/client/impl/HandlerState.java | 7 ++++
.../apache/pulsar/client/impl/ProducerImpl.java | 43 +++++++++++++++-------
3 files changed, 50 insertions(+), 20 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 13de530..13897d5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -37,7 +37,8 @@ public class ConnectionHandler {
protected final Backoff backoff;
private static final AtomicLongFieldUpdater<ConnectionHandler> EPOCH_UPDATER = AtomicLongFieldUpdater
.newUpdater(ConnectionHandler.class, "epoch");
- private volatile long epoch = 0L;
+ // Start with -1L because it gets incremented before sending on the first connection
+ private volatile long epoch = -1L;
protected volatile long lastConnectionClosedTimestamp = 0L;
interface Connection {
@@ -106,15 +107,10 @@ public class ConnectionHandler {
state.setState(State.Connecting);
state.client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName());
- incrementEpoch();
grabCnx();
}, delayMs, TimeUnit.MILLISECONDS);
}
- protected long incrementEpoch() {
- return EPOCH_UPDATER.incrementAndGet(this);
- }
-
public void connectionClosed(ClientCnx cnx) {
lastConnectionClosedTimestamp = System.currentTimeMillis();
state.client.getCnxPool().releaseConnection(cnx);
@@ -129,7 +125,6 @@ public class ConnectionHandler {
delayMs / 1000.0);
state.client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after timeout", state.topic, state.getHandlerName());
- incrementEpoch();
grabCnx();
}, delayMs, TimeUnit.MILLISECONDS);
}
@@ -147,6 +142,17 @@ public class ConnectionHandler {
CLIENT_CNX_UPDATER.set(this, clientCnx);
}
+ /**
+ * Update the {@link ClientCnx} for the class, then increment and get the epoch value. Note that the epoch value is
+ * currently only used by the {@link ProducerImpl}.
+ * @param clientCnx - the new {@link ClientCnx}
+ * @return the epoch value to use for this pair of {@link ClientCnx} and {@link ProducerImpl}
+ */
+ protected long switchClientCnx(ClientCnx clientCnx) {
+ setClientCnx(clientCnx);
+ return EPOCH_UPDATER.incrementAndGet(this);
+ }
+
private boolean isValidStateForReconnection() {
State state = this.state.getState();
switch (state) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
index e72c97f..582df8c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
@@ -64,6 +64,13 @@ abstract class HandlerState {
return STATE_UPDATER.get(this);
}
+ protected boolean changeToConnecting() {
+ return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Connecting)
+ || STATE_UPDATER.compareAndSet(this, State.Ready, State.Connecting)
+ || STATE_UPDATER.compareAndSet(this, State.RegisteringSchema, State.Connecting)
+ || STATE_UPDATER.compareAndSet(this, State.Connecting, State.Connecting));
+ }
+
protected void setState(State s) {
STATE_UPDATER.set(this, s);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 20dd556..b8cc821 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -604,7 +604,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
return true;
}
- private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback callback) {
+ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback callback, long expectedCnxEpoch) {
if (!changeToRegisteringSchemaState()) {
return;
}
@@ -629,7 +629,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
cnx.ctx().channel().eventLoop().execute(() -> {
synchronized (ProducerImpl.this) {
- recoverProcessOpSendMsgFrom(cnx, msg);
+ recoverProcessOpSendMsgFrom(cnx, msg, expectedCnxEpoch);
}
});
return null;
@@ -1061,7 +1061,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
}
// as msg is not corrupted : let producer resend pending-messages again including checksum failed message
- resendMessages(cnx);
+ resendMessages(cnx, this.connectionHandler.getEpoch());
}
protected synchronized void recoverNotAllowedError(long sequenceId) {
@@ -1334,9 +1334,17 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
@Override
public void connectionOpened(final ClientCnx cnx) {
- // we set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating the
- // producer, it will try to grab a new cnx
- connectionHandler.setClientCnx(cnx);
+ final long epoch;
+ synchronized (this) {
+ // Because the state could have been updated while retrieving the connection, we set it back to connecting,
+ // as long as the change from current state to connecting is a valid state change.
+ if (!changeToConnecting()) {
+ return;
+ }
+ // We set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating
+ // the producer, it will try to grab a new cnx. We also increment and get the epoch value for the producer.
+ epoch = connectionHandler.switchClientCnx(cnx);
+ }
cnx.registerProducer(producerId, this);
log.info("[{}] [{}] Creating producer on cnx {}", topic, producerName, cnx.ctx().channel());
@@ -1370,7 +1378,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
cnx.sendRequestWithId(
Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata,
- schemaInfo, connectionHandler.getEpoch(), userProvidedProducerName,
+ schemaInfo, epoch, userProvidedProducerName,
conf.getAccessMode(), topicEpoch),
requestId).thenAccept(response -> {
String producerName = response.getProducerName();
@@ -1432,7 +1440,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
}, 0, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
}
- resendMessages(cnx);
+ resendMessages(cnx, epoch);
}
}).exceptionally((e) -> {
Throwable cause = e.getCause();
@@ -1531,7 +1539,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
}
- private void resendMessages(ClientCnx cnx) {
+ private void resendMessages(ClientCnx cnx, long expectedEpoch) {
cnx.ctx().channel().eventLoop().execute(() -> {
synchronized (this) {
if (getState() == State.Closing || getState() == State.Closed) {
@@ -1558,7 +1566,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
log.info("[{}] [{}] Re-Sending {} messages to server", topic, producerName, messagesToResend);
- recoverProcessOpSendMsgFrom(cnx, null);
+ recoverProcessOpSendMsgFrom(cnx, null, expectedEpoch);
}
});
}
@@ -1788,7 +1796,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
if (shouldWriteOpSendMsg()) {
ClientCnx cnx = cnx();
if (op.msg != null && op.msg.getSchemaState() == None) {
- tryRegisterSchema(cnx, op.msg, op.callback);
+ tryRegisterSchema(cnx, op.msg, op.callback, this.connectionHandler.getEpoch());
return;
}
// If we do have a connection, the message is sent immediately, otherwise we'll try again once a new
@@ -1819,7 +1827,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
return isConnected();
}
- private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from) {
+ // Must acquire a lock on ProducerImpl.this before calling method.
+ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long expectedEpoch) {
+ if (expectedEpoch != this.connectionHandler.getEpoch() || cnx() == null) {
+ // In this case, the cnx passed to this method is no longer the active connection. This method will get
+ // called again once the new connection registers the producer with the broker.
+ log.info("[{}][{}] Producer epoch mismatch or the current connection is null. Skip re-sending the "
+ + " {} pending messages since they will deliver using another connection.", topic, producerName,
+ pendingMessages.size());
+ return;
+ }
final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
OpSendMsg pendingRegisteringOp = null;
@@ -1868,7 +1885,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
return;
}
if (pendingRegisteringOp != null) {
- tryRegisterSchema(cnx, pendingRegisteringOp.msg, pendingRegisteringOp.callback);
+ tryRegisterSchema(cnx, pendingRegisteringOp.msg, pendingRegisteringOp.callback, expectedEpoch);
}
}