You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/03/30 13:19:40 UTC

[GitHub] [cassandra] jasonstack opened a new pull request #497: Cassandra 15666 trunk

jasonstack opened a new pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r407449400
 
 

 ##########
 File path: .circleci/config.yml
 ##########
 @@ -3,10 +3,10 @@ jobs:
   j8_jvm_upgrade_dtests:
     docker:
     - image: spod/cassandra-testing-ubuntu1810-java11-w-dependencies:20190306
-    resource_class: medium
+    resource_class: xlarge
 
 Review comment:
   to speed up Circle-CI... not to merge..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
blerer commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r405412510
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -399,22 +446,25 @@ synchronized void addTransferStreams(Collection<OutgoingStream> streams)
 
     private synchronized Future closeSession(State finalState)
     {
-        Future abortedTasksFuture = null;
-        if (isAborted.compareAndSet(false, true))
+        if (isClosed.compareAndSet(false, true))
 
 Review comment:
   `isClosed` is only used in `closeSession` and the method is synchronized. By consequence, there is no real need to use an `AtomicBoolean` a `boolean` variable should be enough.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
blerer commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r405509994
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -661,7 +718,34 @@ public void received(TableId tableId, int sequenceNumber)
      */
     public synchronized void complete()
     {
-        logger.debug("handling Complete message, state = {}, completeSent = {}", state, completeSent);
+        logger.debug("[Stream #{}] handling Complete message, state = {}, completeSent = {}", planId(), state, completeSent);
+        if (state == State.WAIT_COMPLETE)
+        {
+            if (!completeSent)
+            {
+                messageSender.sendMessage(new CompleteMessage());
+                completeSent = true;
+            }
+            closeSession(State.COMPLETE);
+        }
+        else
+        {
+            state(State.WAIT_COMPLETE);
+        }
+    }
+
+    /**
+     * Synchronize both {@link #complete()} and {@link #maybeCompleted()} to avoid racing
+     */
+    private synchronized boolean maybeCompleted()
+    {
+        if (!(receivers.isEmpty() && transfers.isEmpty()))
+            return false;
+
+        // if already executed once, skip it
+        if (!maybeCompleted.compareAndSet(false, true))
 
 Review comment:
   As `maybCompleted` is only used within this synchroznized method there is no need to use an `AtomicBoolean` 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
blerer commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r405499276
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -399,22 +446,25 @@ synchronized void addTransferStreams(Collection<OutgoingStream> streams)
 
     private synchronized Future closeSession(State finalState)
     {
-        Future abortedTasksFuture = null;
-        if (isAborted.compareAndSet(false, true))
+        if (isClosed.compareAndSet(false, true))
         {
             state(finalState);
 
+            List<Future> futures = new ArrayList<>();
+
             // ensure aborting the tasks do not happen on the network IO thread (read: netty event loop)
             // as we don't want any blocking disk IO to stop the network thread
             if (finalState == State.FAILED)
-                abortedTasksFuture = ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks);
+                futures.add(ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks));
 
-            incomingChannels.values().stream().map(channel -> channel.close());
+            logger.debug("[Stream #{}] Will close attached inbound channels {}", planId(), incomingChannels);
+            incomingChannels.values().forEach(channel -> futures.add(channel.close()));
             messageSender.close();
 
             streamResult.handleSessionComplete(this);
+            closeFuture = FBUtilities.allOf(futures);
         }
-        return abortedTasksFuture != null ? abortedTasksFuture : Futures.immediateFuture(null);
+        return closeFuture != null ? closeFuture : Futures.immediateFuture(null);
 
 Review comment:
   The return line is confusing. In practice the code should always return `closeFuture` as it should always go into the `if` block the first time that it enter the loop. I would remove the `isClosed` variable and rely on whether `closeFuture `is `null` or not. Some comment would also help to clarify the code logic.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r405573119
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -399,22 +446,25 @@ synchronized void addTransferStreams(Collection<OutgoingStream> streams)
 
     private synchronized Future closeSession(State finalState)
     {
-        Future abortedTasksFuture = null;
-        if (isAborted.compareAndSet(false, true))
+        if (isClosed.compareAndSet(false, true))
         {
             state(finalState);
 
+            List<Future> futures = new ArrayList<>();
+
             // ensure aborting the tasks do not happen on the network IO thread (read: netty event loop)
             // as we don't want any blocking disk IO to stop the network thread
             if (finalState == State.FAILED)
-                abortedTasksFuture = ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks);
+                futures.add(ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks));
 
-            incomingChannels.values().stream().map(channel -> channel.close());
+            logger.debug("[Stream #{}] Will close attached inbound channels {}", planId(), incomingChannels);
+            incomingChannels.values().forEach(channel -> futures.add(channel.close()));
             messageSender.close();
 
             streamResult.handleSessionComplete(this);
+            closeFuture = FBUtilities.allOf(futures);
         }
-        return abortedTasksFuture != null ? abortedTasksFuture : Futures.immediateFuture(null);
+        return closeFuture != null ? closeFuture : Futures.immediateFuture(null);
 
 Review comment:
   good idea, replaced `isClosed` with `closeFuture`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r408108762
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
 ##########
 @@ -262,7 +254,7 @@ else if (message instanceof IncomingStreamMessage)
             if (streamSession == null)
                 throw new IllegalStateException(createLogTag(null, channel) + " no session found for message " + message);
 
-            streamSession.attach(channel);
+            streamSession.attachInbound(channel);
 
 Review comment:
   I didn't add `isControlConnect` param to `attachInbound`, because:
   * on initiator side, the control channel has already been created; 
   * on follower side, the control channel is attached when receiving `StreamInitMessage`. Control connection won't be attached twice
   
   I will include it to improve clarity..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r405572743
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -661,7 +718,34 @@ public void received(TableId tableId, int sequenceNumber)
      */
     public synchronized void complete()
     {
-        logger.debug("handling Complete message, state = {}, completeSent = {}", state, completeSent);
+        logger.debug("[Stream #{}] handling Complete message, state = {}, completeSent = {}", planId(), state, completeSent);
+        if (state == State.WAIT_COMPLETE)
+        {
+            if (!completeSent)
+            {
+                messageSender.sendMessage(new CompleteMessage());
+                completeSent = true;
+            }
+            closeSession(State.COMPLETE);
+        }
+        else
+        {
+            state(State.WAIT_COMPLETE);
+        }
+    }
+
+    /**
+     * Synchronize both {@link #complete()} and {@link #maybeCompleted()} to avoid racing
+     */
+    private synchronized boolean maybeCompleted()
+    {
+        if (!(receivers.isEmpty() && transfers.isEmpty()))
+            return false;
+
+        // if already executed once, skip it
+        if (!maybeCompleted.compareAndSet(false, true))
 
 Review comment:
   +1

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] sbtourist commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
sbtourist commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r402406186
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -156,18 +166,57 @@
     private final NettyStreamingMessageSender messageSender;
     private final ConcurrentMap<ChannelId, Channel> incomingChannels = new ConcurrentHashMap<>();
 
-    private final AtomicBoolean isAborted = new AtomicBoolean(false);
+    // "maybeCompleted()" should be executed at most once. Because it can be executed asynchronously by IO
+    // threads(serialization/deserialization) and stream messaging processing thread, causing connection closed before
+    // receiving peer's CompleteMessage.
+    private final AtomicBoolean maybeCompleted = new AtomicBoolean(false);
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+    private Future closeFuture;
+
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
+    /**
+     * State Transition:
+     *
+     * <pre>
+     *
+     *  +------------------+----------> FAILED <------------------+
+     *  |                  |              ^                       |
+     *  |                  |              |                       |
+     *  INITIALIZED --> PREPARING --> STREAMING ----------> WAIT_COMPLETE ----> COMPLETED
+     *  |                  |                                      ^                 ^
+     *  |                  |         if preview                   |                 |
+     *  |                  +--------------------------------------+                 |
+     *  |               nothing to request or to transfer                           |
+     *  +---------------------------------------------------------------------------+
+     *                  nothing to request or to transfer
+     *
+     *  </pre>
+     */
     public enum State
     {
-        INITIALIZED,
-        PREPARING,
-        STREAMING,
-        WAIT_COMPLETE,
-        COMPLETE,
-        FAILED,
+        INITIALIZED(false),
+        PREPARING(false),
+        STREAMING(false),
+        WAIT_COMPLETE(false),
+        COMPLETE(true),
+        FAILED(true);
+
+        private final boolean finalState;
+
+        State(boolean finalState)
+        {
+            this.finalState = finalState;
+        }
+
+        /**
+         * @return true if current statu is final and cannot be change to other state.
 
 Review comment:
   nit: `statu` -> `status`, `change` -> `changed`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r402417840
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -830,4 +893,56 @@ public int getNumTransfers()
     {
         return transferredRangesPerKeyspace.size();
     }
+
+    public static class MessageStateSink
+    {
+        // use enum ordinal instead of enum to walk around inter-jvm class loader issue, only classes defined in
+        // InstanceClassLoader#sharedClassNames are shareable between server jvm and test jvm
+        public final Map<InetAddress, Deque<Integer>> messageSink = new ConcurrentHashMap<>();
+        public final Map<InetAddress, Deque<Integer>> stateTransitions = new ConcurrentHashMap<>();
+
+        private boolean skipKeepAlive = true;
 
 Review comment:
   +1

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r402421588
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -156,18 +166,57 @@
     private final NettyStreamingMessageSender messageSender;
     private final ConcurrentMap<ChannelId, Channel> incomingChannels = new ConcurrentHashMap<>();
 
-    private final AtomicBoolean isAborted = new AtomicBoolean(false);
+    // "maybeCompleted()" should be executed at most once. Because it can be executed asynchronously by IO
+    // threads(serialization/deserialization) and stream messaging processing thread, causing connection closed before
+    // receiving peer's CompleteMessage.
+    private final AtomicBoolean maybeCompleted = new AtomicBoolean(false);
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+    private Future closeFuture;
+
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
+    /**
+     * State Transition:
+     *
+     * <pre>
+     *
+     *  +------------------+----------> FAILED <------------------+
+     *  |                  |              ^                       |
+     *  |                  |              |                       |
+     *  INITIALIZED --> PREPARING --> STREAMING ----------> WAIT_COMPLETE ----> COMPLETED
+     *  |                  |                                      ^                 ^
+     *  |                  |         if preview                   |                 |
+     *  |                  +--------------------------------------+                 |
+     *  |               nothing to request or to transfer                           |
+     *  +---------------------------------------------------------------------------+
+     *                  nothing to request or to transfer
+     *
+     *  </pre>
+     */
     public enum State
     {
-        INITIALIZED,
-        PREPARING,
-        STREAMING,
-        WAIT_COMPLETE,
-        COMPLETE,
-        FAILED,
+        INITIALIZED(false),
+        PREPARING(false),
+        STREAMING(false),
+        WAIT_COMPLETE(false),
+        COMPLETE(true),
+        FAILED(true);
+
+        private final boolean finalState;
+
+        State(boolean finalState)
+        {
+            this.finalState = finalState;
+        }
+
+        /**
+         * @return true if current statu is final and cannot be change to other state.
 
 Review comment:
   updated the javadoc.. State transition is not enforced, leaving it to [C-14754](https://issues.apache.org/jira/browse/CASSANDRA-14754)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] sbtourist commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
sbtourist commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r408079654
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
 ##########
 @@ -262,7 +254,7 @@ else if (message instanceof IncomingStreamMessage)
             if (streamSession == null)
                 throw new IllegalStateException(createLogTag(null, channel) + " no session found for message " + message);
 
-            streamSession.attach(channel);
+            streamSession.attachInbound(channel);
 
 Review comment:
   Shouldn't we only attach the inbound control channel when receiving a `StreamInitMessage`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] sbtourist commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
sbtourist commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r406321694
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamCoordinator.java
 ##########
 @@ -42,17 +42,19 @@
     private final Map<InetAddressAndPort, HostStreamingData> peerSessions = new HashMap<>();
     private final StreamOperation streamOperation;
     private final int connectionsPerHost;
+    private final boolean initiator;
     private StreamConnectionFactory factory;
     private Iterator<StreamSession> sessionsToConnect = null;
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
     public StreamCoordinator(StreamOperation streamOperation, int connectionsPerHost, StreamConnectionFactory factory,
-                             boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
+                             boolean initiator, boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
 
 Review comment:
   For consistency with `StreamSession`, what about passing "follower", rather than "initiator"?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r405578203
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -830,4 +893,57 @@ public int getNumTransfers()
     {
         return transferredRangesPerKeyspace.size();
     }
+
+    @VisibleForTesting
+    public static class MessageStateSink
 
 Review comment:
   that's what I tried initially. 
   
   The jvm-dtest currently don't support fetching non-primitive objects from server jvm to test jvm, because of class loading..
   
   One way is to put verification code in the implementation and let it execute on server jvm. I will update it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r409684795
 
 

 ##########
 File path: .circleci/config.yml
 ##########
 @@ -85,16 +85,16 @@ jobs:
     - CASS_DRIVER_NO_EXTENSIONS: true
     - CASS_DRIVER_NO_CYTHON: true
     - CASSANDRA_SKIP_SYNC: true
-    - DTEST_REPO: git://github.com/apache/cassandra-dtest.git
-    - DTEST_BRANCH: master
+    - DTEST_REPO: git://github.com/jasonstack/cassandra-dtest.git
+    - DTEST_BRANCH: CASSANDRA-15666
 
 Review comment:
   redirect circle-ci to use custom dtest branch, test pending: https://circleci.com/workflow-run/85838845-bef8-4015-aff9-8a2246c348c2

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r406343404
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -295,11 +300,39 @@ public void init(StreamResultFuture streamResult)
      * @param channel The channel to attach.
      * @return False if the channel was already attached, true otherwise.
      */
-    public boolean attachInbound(Channel channel)
+    public synchronized boolean attachInbound(Channel channel)
     {
+        failIfFinished();
+
         if (!messageSender.hasControlChannel())
             messageSender.injectControlMessageChannel(channel);
-        return incomingChannels.putIfAbsent(channel.id(), channel) == null;
+
+        channel.closeFuture().addListener(ignored -> onChannelClose(channel));
+        return channels.putIfAbsent(channel.id(), channel) == null;
+    }
+
+    /**
+     * Attach a channel to this session upon sending the first outbound message.
+     *
+     * @param channel The channel to attach.
+     * @return False if the channel was already attached, true otherwise.
+     */
+    public synchronized boolean attachOutbound(Channel channel)
+    {
+        failIfFinished();
+
+        channel.closeFuture().addListener(ignored -> onChannelClose(channel));
+        return channels.putIfAbsent(channel.id(), channel) == null;
+    }
+
+    /**
+     * On channel closing, if no channels are left just close the message sender; this must be closed last to ensure
+     * keep alive messages are sent until the very end of the streaming session.
+     */
+    private void onChannelClose(Channel channel)
 
 Review comment:
   you are right..added `synchronized `

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] sbtourist commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
sbtourist commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r406309640
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -295,11 +300,39 @@ public void init(StreamResultFuture streamResult)
      * @param channel The channel to attach.
      * @return False if the channel was already attached, true otherwise.
      */
-    public boolean attachInbound(Channel channel)
+    public synchronized boolean attachInbound(Channel channel)
     {
+        failIfFinished();
+
         if (!messageSender.hasControlChannel())
             messageSender.injectControlMessageChannel(channel);
-        return incomingChannels.putIfAbsent(channel.id(), channel) == null;
+
+        channel.closeFuture().addListener(ignored -> onChannelClose(channel));
+        return channels.putIfAbsent(channel.id(), channel) == null;
+    }
+
+    /**
+     * Attach a channel to this session upon sending the first outbound message.
+     *
+     * @param channel The channel to attach.
+     * @return False if the channel was already attached, true otherwise.
+     */
+    public synchronized boolean attachOutbound(Channel channel)
+    {
+        failIfFinished();
+
+        channel.closeFuture().addListener(ignored -> onChannelClose(channel));
+        return channels.putIfAbsent(channel.id(), channel) == null;
+    }
+
+    /**
+     * On channel closing, if no channels are left just close the message sender; this must be closed last to ensure
+     * keep alive messages are sent until the very end of the streaming session.
+     */
+    private void onChannelClose(Channel channel)
 
 Review comment:
   This is called from different channels, so possibly from different event loop threads, so shouldn't it be synchronized?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] sbtourist commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
sbtourist commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r402407051
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -156,18 +166,57 @@
     private final NettyStreamingMessageSender messageSender;
     private final ConcurrentMap<ChannelId, Channel> incomingChannels = new ConcurrentHashMap<>();
 
-    private final AtomicBoolean isAborted = new AtomicBoolean(false);
+    // "maybeCompleted()" should be executed at most once. Because it can be executed asynchronously by IO
+    // threads(serialization/deserialization) and stream messaging processing thread, causing connection closed before
+    // receiving peer's CompleteMessage.
+    private final AtomicBoolean maybeCompleted = new AtomicBoolean(false);
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+    private Future closeFuture;
+
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
+    /**
+     * State Transition:
+     *
+     * <pre>
+     *
+     *  +------------------+----------> FAILED <------------------+
+     *  |                  |              ^                       |
+     *  |                  |              |                       |
+     *  INITIALIZED --> PREPARING --> STREAMING ----------> WAIT_COMPLETE ----> COMPLETED
+     *  |                  |                                      ^                 ^
+     *  |                  |         if preview                   |                 |
+     *  |                  +--------------------------------------+                 |
+     *  |               nothing to request or to transfer                           |
+     *  +---------------------------------------------------------------------------+
+     *                  nothing to request or to transfer
+     *
+     *  </pre>
+     */
     public enum State
     {
-        INITIALIZED,
-        PREPARING,
-        STREAMING,
-        WAIT_COMPLETE,
-        COMPLETE,
-        FAILED,
+        INITIALIZED(false),
+        PREPARING(false),
+        STREAMING(false),
+        WAIT_COMPLETE(false),
+        COMPLETE(true),
+        FAILED(true);
+
+        private final boolean finalState;
+
+        State(boolean finalState)
+        {
+            this.finalState = finalState;
+        }
+
+        /**
+         * @return true if current statu is final and cannot be change to other state.
 
 Review comment:
   Also, this doesn't seem to be actually enforced? That is, we still allow to go from a final state to a different state.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] sbtourist commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
sbtourist commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r402403657
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -830,4 +893,56 @@ public int getNumTransfers()
     {
         return transferredRangesPerKeyspace.size();
     }
+
+    public static class MessageStateSink
+    {
+        // use enum ordinal instead of enum to walk around inter-jvm class loader issue, only classes defined in
+        // InstanceClassLoader#sharedClassNames are shareable between server jvm and test jvm
+        public final Map<InetAddress, Deque<Integer>> messageSink = new ConcurrentHashMap<>();
+        public final Map<InetAddress, Deque<Integer>> stateTransitions = new ConcurrentHashMap<>();
+
+        private boolean skipKeepAlive = true;
 
 Review comment:
   Shouldn't these two be `volatile`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r405572845
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -399,22 +446,25 @@ synchronized void addTransferStreams(Collection<OutgoingStream> streams)
 
     private synchronized Future closeSession(State finalState)
     {
-        Future abortedTasksFuture = null;
-        if (isAborted.compareAndSet(false, true))
+        if (isClosed.compareAndSet(false, true))
 
 Review comment:
   +1

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r405576059
 
 

 ##########
 File path: src/java/org/apache/cassandra/utils/FBUtilities.java
 ##########
 @@ -486,6 +486,81 @@ public static int nowInSeconds()
             Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
         }
     }
+
+    /**
+     * Returns a new {@link Future} wrapping the given list of futures and returning a list of their results.
+     */
+    public static Future<List> allOf(Collection<Future> futures)
+    {
+        if (futures.isEmpty())
+            return CompletableFuture.completedFuture(null);
+
+        return new Future<List>()
+        {
+            @Override
+            @SuppressWarnings("unchecked")
+            public List get() throws InterruptedException, ExecutionException
+            {
+                List result = new LinkedList<>();
 
 Review comment:
   changed to `new ArrayList<>(futures.size());`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
blerer commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r405544359
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -830,4 +893,57 @@ public int getNumTransfers()
     {
         return transferredRangesPerKeyspace.size();
     }
+
+    @VisibleForTesting
+    public static class MessageStateSink
 
 Review comment:
   Making the code easier to test is a good thing. We should just avoid having test related code within the production code.
   I would have prefered to have a listener interface here and the implementation at the test level.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r402417741
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -127,6 +134,9 @@
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
 
+    // for test purpose to record received message and state transition
+    public static volatile MessageStateSink sink = new MessageStateSink();
 
 Review comment:
   right. `final` is sufficient 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
blerer commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r405523757
 
 

 ##########
 File path: src/java/org/apache/cassandra/utils/FBUtilities.java
 ##########
 @@ -486,6 +486,81 @@ public static int nowInSeconds()
             Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
         }
     }
+
+    /**
+     * Returns a new {@link Future} wrapping the given list of futures and returning a list of their results.
+     */
+    public static Future<List> allOf(Collection<Future> futures)
+    {
+        if (futures.isEmpty())
+            return CompletableFuture.completedFuture(null);
+
+        return new Future<List>()
+        {
+            @Override
+            @SuppressWarnings("unchecked")
+            public List get() throws InterruptedException, ExecutionException
+            {
+                List result = new LinkedList<>();
 
 Review comment:
   Is there a reason for choosing a `LinkedList` over an `ArrayList` ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] sbtourist commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
sbtourist commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r406322210
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -153,51 +159,86 @@
 
     final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new HashMap<>();
 
+    private final boolean isFollower;
     private final NettyStreamingMessageSender messageSender;
-    private final ConcurrentMap<ChannelId, Channel> incomingChannels = new ConcurrentHashMap<>();
+    // contains both inbound and outbound channels
+    private final ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>();
+
+    // "maybeCompleted()" should be executed at most once. Because it can be executed asynchronously by IO
+    // threads(serialization/deserialization) and stream messaging processing thread, causing connection closed before
+    // receiving peer's CompleteMessage.
+    private boolean maybeCompleted = false;
+    private Future closeFuture;
 
-    private final AtomicBoolean isAborted = new AtomicBoolean(false);
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
+    /**
+     * State Transition:
+     *
+     * <pre>
+     * FIXME
 
 Review comment:
   What should be fixed here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] sbtourist commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
sbtourist commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r402403108
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -127,6 +134,9 @@
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
 
+    // for test purpose to record received message and state transition
+    public static volatile MessageStateSink sink = new MessageStateSink();
 
 Review comment:
   Why is this `volatile` rather than `final`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r408108762
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
 ##########
 @@ -262,7 +254,7 @@ else if (message instanceof IncomingStreamMessage)
             if (streamSession == null)
                 throw new IllegalStateException(createLogTag(null, channel) + " no session found for message " + message);
 
-            streamSession.attach(channel);
+            streamSession.attachInbound(channel);
 
 Review comment:
   I didn't add `isControlConnect` param to `attachInbound`, because:
   * on initiator side, the control channel has already been created; 
   * on follower side, the control channel is attached when receiving `StreamInitMessage`.
   
   I will include it to improve clarity..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r406343852
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamSession.java
 ##########
 @@ -153,51 +159,86 @@
 
     final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new HashMap<>();
 
+    private final boolean isFollower;
     private final NettyStreamingMessageSender messageSender;
-    private final ConcurrentMap<ChannelId, Channel> incomingChannels = new ConcurrentHashMap<>();
+    // contains both inbound and outbound channels
+    private final ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>();
+
+    // "maybeCompleted()" should be executed at most once. Because it can be executed asynchronously by IO
+    // threads(serialization/deserialization) and stream messaging processing thread, causing connection closed before
+    // receiving peer's CompleteMessage.
+    private boolean maybeCompleted = false;
+    private Future closeFuture;
 
-    private final AtomicBoolean isAborted = new AtomicBoolean(false);
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
+    /**
+     * State Transition:
+     *
+     * <pre>
+     * FIXME
 
 Review comment:
   removed it. it was a reminder to update the flow diagram which is already updated to reflect: only follower will send `Complete` message

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jasonstack commented on a change in pull request #497: Cassandra 15666 trunk

Posted by GitBox <gi...@apache.org>.
jasonstack commented on a change in pull request #497: Cassandra 15666 trunk
URL: https://github.com/apache/cassandra/pull/497#discussion_r406343008
 
 

 ##########
 File path: src/java/org/apache/cassandra/streaming/StreamCoordinator.java
 ##########
 @@ -42,17 +42,19 @@
     private final Map<InetAddressAndPort, HostStreamingData> peerSessions = new HashMap<>();
     private final StreamOperation streamOperation;
     private final int connectionsPerHost;
+    private final boolean initiator;
     private StreamConnectionFactory factory;
     private Iterator<StreamSession> sessionsToConnect = null;
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
     public StreamCoordinator(StreamOperation streamOperation, int connectionsPerHost, StreamConnectionFactory factory,
-                             boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
+                             boolean initiator, boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
 
 Review comment:
   good idea, updated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org