You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/06/01 17:16:31 UTC

[GitHub] [cassandra] dcapwell commented on a diff in pull request #1649: CASSANDRA-17663 Ensure FileStreamTask cannot compromise shared channel proxy for system table when interrupted

dcapwell commented on code in PR #1649:
URL: https://github.com/apache/cassandra/pull/1649#discussion_r887055519


##########
src/java/org/apache/cassandra/repair/LocalSyncTask.java:
##########
@@ -105,9 +106,9 @@ StreamPlan createStreamPlan()
      * that will be called out of band once the streams complete.
      */
     @Override
-    protected synchronized void startSync()
+    protected void startSync()
     {
-        if (active)
+        if (active.get())

Review Comment:
   race bug where we can start while failing, can do the following to improve (not solve)
   
   ```
   diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
   index 71cec282ad..1772d2ba42 100644
   --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
   +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
   @@ -19,6 +19,7 @@ package org.apache.cassandra.repair;
    
    import java.util.Collections;
    import java.util.List;
   +import java.util.concurrent.atomic.AtomicReference;
    
    import com.google.common.annotations.VisibleForTesting;
    import com.google.common.base.Preconditions;
   @@ -58,8 +59,9 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
        @VisibleForTesting
        public final boolean transferRanges;
    
   -    private boolean active = true;
   -    private StreamPlan streamPlan;
   +    private enum States { INIT, STARTING, START, COMPLETE }
   +    private final AtomicReference<States> active = new AtomicReference<>(States.INIT);
   +    private volatile StreamPlan streamPlan;
    
        public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote,
                             List<Range<Token>> diff, TimeUUID pendingRepair,
   @@ -107,7 +109,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
        @Override
        protected synchronized void startSync()
        {
   -        if (active)
   +        if (active.compareAndSet(States.INIT, States.STARTING))
            {
                InetAddressAndPort remote = nodePair.peer;
    
   @@ -115,8 +117,15 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
                logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
                Tracing.traceRepair(message);
    
   -            streamPlan = createStreamPlan();
   -            streamPlan.execute();
   +            if (active.compareAndSet(States.STARTING, States.START))
   +            {
   +                streamPlan = createStreamPlan();
   +                streamPlan.execute();
   +            }
   +            else
   +            {
   +                logger.warn("Unable to start stream plan due to active state being {}", active.get());
   +            }
            }
        }
    
   @@ -155,9 +164,8 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
        @Override
        public synchronized void onSuccess(StreamState result)
        {
   -        if (active)
   +        if (maybeComplete())
            {
   -            active = false;
                String status = result.hasAbortedSession() ? "aborted" : "complete";
                String message = String.format("Sync %s using session %s between %s and %s on %s",
                                               status, desc.sessionId, nodePair.coordinator, nodePair.peer, desc.columnFamily);
   @@ -171,9 +179,8 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
        @Override
        public synchronized void onFailure(Throwable t)
        {
   -        if (active)
   +        if (maybeComplete())
            {
   -            active = false;
                tryFailure(t);
                finished();
            }
   @@ -193,11 +200,20 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
        @Override
        public synchronized void abort()
        {
   -        if (active)
   +        if (active.compareAndSet(States.START, States.COMPLETE))
   +        {
   +            // stream plan is being built... wait for it
   +            StreamPlan plan;
   +            while ((plan = streamPlan) == null)
   +            {
   +                // wait for it
   +            }
   +            plan.getCoordinator().getAllStreamSessions().forEach(StreamSession::abort);
   +        }
   +        else if (maybeComplete())
            {
                if (streamPlan == null)
                {
   -                active = false;
                    String message = String.format("Sync for session %s between %s and %s on %s aborted before starting",
                                                   desc.sessionId, nodePair.coordinator, nodePair.peer, desc.columnFamily);
                    logger.debug("{} {}", previewKind.logPrefix(desc.sessionId), message);
   @@ -209,4 +225,9 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
                }
            }
        }
   +
   +    private boolean maybeComplete()
   +    {
   +        return active.compareAndSet(States.STARTING, States.COMPLETE) || active.compareAndSet(States.INIT, States.COMPLETE);
   +    }
    }
   
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org