You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 11:17:36 UTC

[pulsar] branch branch-2.8 updated: [Java Client] Use epoch to version producer's cnx to prevent early de… (#12779)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 82a4d31  [Java Client] Use epoch to version producer's cnx to prevent early de… (#12779)
82a4d31 is described below

commit 82a4d3169fc68e4ac0c912b0314b88aac2377f78
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Dec 9 13:00:44 2021 +0200

    [Java Client] Use epoch to version producer's cnx to prevent early de… (#12779)
    
    * [Java Client] Use epoch to version producer's cnx to prevent early delivery of messages
    
    * Update initial epoch value: it now gets incremented before the first connection
    
    (cherry picked from commit ab652b8dbaecbb59e4a20e5b1f5ce93147502b24)
---
 .../pulsar/client/impl/ConnectionHandler.java      | 20 ++++++----
 .../apache/pulsar/client/impl/HandlerState.java    |  7 ++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 43 +++++++++++++++-------
 3 files changed, 50 insertions(+), 20 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 13de530..13897d5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -37,7 +37,8 @@ public class ConnectionHandler {
     protected final Backoff backoff;
     private static final AtomicLongFieldUpdater<ConnectionHandler> EPOCH_UPDATER = AtomicLongFieldUpdater
             .newUpdater(ConnectionHandler.class, "epoch");
-    private volatile long epoch = 0L;
+    // Start with -1L because it gets incremented before sending on the first connection
+    private volatile long epoch = -1L;
     protected volatile long lastConnectionClosedTimestamp = 0L;
 
     interface Connection {
@@ -106,15 +107,10 @@ public class ConnectionHandler {
         state.setState(State.Connecting);
         state.client.timer().newTimeout(timeout -> {
             log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName());
-            incrementEpoch();
             grabCnx();
         }, delayMs, TimeUnit.MILLISECONDS);
     }
 
-    protected long incrementEpoch() {
-        return EPOCH_UPDATER.incrementAndGet(this);
-    }
-
     public void connectionClosed(ClientCnx cnx) {
         lastConnectionClosedTimestamp = System.currentTimeMillis();
         state.client.getCnxPool().releaseConnection(cnx);
@@ -129,7 +125,6 @@ public class ConnectionHandler {
                     delayMs / 1000.0);
             state.client.timer().newTimeout(timeout -> {
                 log.info("[{}] [{}] Reconnecting after timeout", state.topic, state.getHandlerName());
-                incrementEpoch();
                 grabCnx();
             }, delayMs, TimeUnit.MILLISECONDS);
         }
@@ -147,6 +142,17 @@ public class ConnectionHandler {
         CLIENT_CNX_UPDATER.set(this, clientCnx);
     }
 
+    /**
+     * Update the {@link ClientCnx} for the class, then increment and get the epoch value. Note that the epoch value is
+     * currently only used by the {@link ProducerImpl}.
+     * @param clientCnx - the new {@link ClientCnx}
+     * @return the epoch value to use for this pair of {@link ClientCnx} and {@link ProducerImpl}
+     */
+    protected long switchClientCnx(ClientCnx clientCnx) {
+        setClientCnx(clientCnx);
+        return EPOCH_UPDATER.incrementAndGet(this);
+    }
+
     private boolean isValidStateForReconnection() {
         State state = this.state.getState();
         switch (state) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
index e72c97f..582df8c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
@@ -64,6 +64,13 @@ abstract class HandlerState {
         return STATE_UPDATER.get(this);
     }
 
+    protected boolean changeToConnecting() {
+        return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Connecting)
+                || STATE_UPDATER.compareAndSet(this, State.Ready, State.Connecting)
+                || STATE_UPDATER.compareAndSet(this, State.RegisteringSchema, State.Connecting)
+                || STATE_UPDATER.compareAndSet(this, State.Connecting, State.Connecting));
+    }
+
     protected void setState(State s) {
         STATE_UPDATER.set(this, s);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 20dd556..b8cc821 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -604,7 +604,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         return true;
     }
 
-    private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback callback) {
+    private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback callback, long expectedCnxEpoch) {
         if (!changeToRegisteringSchemaState()) {
             return;
         }
@@ -629,7 +629,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             }
             cnx.ctx().channel().eventLoop().execute(() -> {
                 synchronized (ProducerImpl.this) {
-                    recoverProcessOpSendMsgFrom(cnx, msg);
+                    recoverProcessOpSendMsgFrom(cnx, msg, expectedCnxEpoch);
                 }
             });
             return null;
@@ -1061,7 +1061,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             }
         }
         // as msg is not corrupted : let producer resend pending-messages again including checksum failed message
-        resendMessages(cnx);
+        resendMessages(cnx, this.connectionHandler.getEpoch());
     }
 
     protected synchronized void recoverNotAllowedError(long sequenceId) {
@@ -1334,9 +1334,17 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
     @Override
     public void connectionOpened(final ClientCnx cnx) {
-        // we set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating the
-        // producer, it will try to grab a new cnx
-        connectionHandler.setClientCnx(cnx);
+        final long epoch;
+        synchronized (this) {
+            // Because the state could have been updated while retrieving the connection, we set it back to connecting,
+            // as long as the change from current state to connecting is a valid state change.
+            if (!changeToConnecting()) {
+                return;
+            }
+            // We set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating
+            // the producer, it will try to grab a new cnx. We also increment and get the epoch value for the producer.
+            epoch = connectionHandler.switchClientCnx(cnx);
+        }
         cnx.registerProducer(producerId, this);
 
         log.info("[{}] [{}] Creating producer on cnx {}", topic, producerName, cnx.ctx().channel());
@@ -1370,7 +1378,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
         cnx.sendRequestWithId(
                 Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata,
-                       schemaInfo, connectionHandler.getEpoch(), userProvidedProducerName,
+                       schemaInfo, epoch, userProvidedProducerName,
                        conf.getAccessMode(), topicEpoch),
                 requestId).thenAccept(response -> {
                     String producerName = response.getProducerName();
@@ -1432,7 +1440,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                                 }
                             }, 0, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
                         }
-                        resendMessages(cnx);
+                        resendMessages(cnx, epoch);
                     }
                 }).exceptionally((e) -> {
                     Throwable cause = e.getCause();
@@ -1531,7 +1539,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         }
     }
 
-    private void resendMessages(ClientCnx cnx) {
+    private void resendMessages(ClientCnx cnx, long expectedEpoch) {
         cnx.ctx().channel().eventLoop().execute(() -> {
             synchronized (this) {
                 if (getState() == State.Closing || getState() == State.Closed) {
@@ -1558,7 +1566,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 }
 
                 log.info("[{}] [{}] Re-Sending {} messages to server", topic, producerName, messagesToResend);
-                recoverProcessOpSendMsgFrom(cnx, null);
+                recoverProcessOpSendMsgFrom(cnx, null, expectedEpoch);
             }
         });
     }
@@ -1788,7 +1796,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             if (shouldWriteOpSendMsg()) {
                 ClientCnx cnx = cnx();
                 if (op.msg != null && op.msg.getSchemaState() == None) {
-                    tryRegisterSchema(cnx, op.msg, op.callback);
+                    tryRegisterSchema(cnx, op.msg, op.callback, this.connectionHandler.getEpoch());
                     return;
                 }
                 // If we do have a connection, the message is sent immediately, otherwise we'll try again once a new
@@ -1819,7 +1827,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         return isConnected();
     }
 
-    private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from) {
+    // Must acquire a lock on ProducerImpl.this before calling method.
+    private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long expectedEpoch) {
+        if (expectedEpoch != this.connectionHandler.getEpoch() || cnx() == null) {
+            // In this case, the cnx passed to this method is no longer the active connection. This method will get
+            // called again once the new connection registers the producer with the broker.
+            log.info("[{}][{}] Producer epoch mismatch or the current connection is null. Skip re-sending the "
+                    + " {} pending messages since they will deliver using another connection.", topic, producerName,
+                    pendingMessages.size());
+            return;
+        }
         final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
         Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
         OpSendMsg pendingRegisteringOp = null;
@@ -1868,7 +1885,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             return;
         }
         if (pendingRegisteringOp != null) {
-            tryRegisterSchema(cnx, pendingRegisteringOp.msg, pendingRegisteringOp.callback);
+            tryRegisterSchema(cnx, pendingRegisteringOp.msg, pendingRegisteringOp.callback, expectedEpoch);
         }
     }