You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/05/15 18:02:58 UTC

[2/3] cassandra git commit: Remove parent session on remotes when repair fails

Remove parent session on remotes when repair fails

Follow up on CASSANDRA-9097


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01087489
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01087489
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01087489

Branch: refs/heads/trunk
Commit: 01087489ed5484a2f7c5e7d55e8adb8bcf51b7a7
Parents: 3476a25
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu May 14 19:07:05 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri May 15 10:54:41 2015 -0500

----------------------------------------------------------------------
 .../cassandra/repair/AnticompactionTask.java    | 15 ++++-
 .../repair/RepairMessageVerbHandler.java        |  9 ++-
 .../repair/messages/CleanupMessage.java         | 61 ++++++++++++++++++++
 .../repair/messages/RepairMessage.java          |  3 +-
 .../cassandra/service/ActiveRepairService.java  | 20 +++----
 5 files changed, 93 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/01087489/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index e505d91..f41d26c 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.AnticompactionRequest;
+import org.apache.cassandra.repair.messages.CleanupMessage;
 import org.apache.cassandra.utils.SemanticVersion;
 
 public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
@@ -40,11 +41,13 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
 
     private final UUID parentSession;
     private final InetAddress neighbor;
+    private final boolean doAnticompaction;
 
-    public AnticompactionTask(UUID parentSession, InetAddress neighbor)
+    public AnticompactionTask(UUID parentSession, InetAddress neighbor, boolean doAnticompaction)
     {
         this.parentSession = parentSession;
         this.neighbor = neighbor;
+        this.doAnticompaction = doAnticompaction;
     }
 
     public void run()
@@ -53,7 +56,15 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
         SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
         if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
         {
-            MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+            if (doAnticompaction)
+            {
+                MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+            }
+            else
+            {
+                // we need to clean up parent session
+                MessagingService.instance().sendRR(new CleanupMessage(parentSession).createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+            }
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01087489/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 60b2243..872978e 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Future;
 
 import com.google.common.base.Predicate;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -44,7 +43,6 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -125,6 +123,13 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     }, MoreExecutors.sameThreadExecutor());
                     break;
 
+                case CLEANUP:
+                    logger.debug("cleaning up repair");
+                    CleanupMessage cleanup = (CleanupMessage) message.payload;
+                    ActiveRepairService.instance.removeParentRepairSession(cleanup.parentRepairSession);
+                    MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
+                    break;
+
                 default:
                     ActiveRepairService.instance.handleMessage(message.from, message.payload);
                     break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01087489/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
new file mode 100644
index 0000000..6d702ce
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * Message to cleanup repair resources on replica nodes.
+ *
+ * @since 2.1.6
+ */
+public class CleanupMessage extends RepairMessage
+{
+    public static MessageSerializer serializer = new CleanupMessageSerializer();
+    public final UUID parentRepairSession;
+
+    public CleanupMessage(UUID parentRepairSession)
+    {
+        super(Type.CLEANUP, null);
+        this.parentRepairSession = parentRepairSession;
+    }
+
+    public static class CleanupMessageSerializer implements MessageSerializer<CleanupMessage>
+    {
+        public void serialize(CleanupMessage message, DataOutputPlus out, int version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version);
+        }
+
+        public CleanupMessage deserialize(DataInput in, int version) throws IOException
+        {
+            UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version);
+            return new CleanupMessage(parentRepairSession);
+        }
+
+        public long serializedSize(CleanupMessage message, int version)
+        {
+            return UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01087489/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index d500928..b49f1f3 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -45,7 +45,8 @@ public abstract class RepairMessage
         SYNC_COMPLETE(3, SyncComplete.serializer),
         ANTICOMPACTION_REQUEST(4, AnticompactionRequest.serializer),
         PREPARE_MESSAGE(5, PrepareMessage.serializer),
-        SNAPSHOT(6, SnapshotMessage.serializer);
+        SNAPSHOT(6, SnapshotMessage.serializer),
+        CLEANUP(7, CleanupMessage.serializer);
 
         private final byte type;
         private final MessageSerializer<RepairMessage> serializer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01087489/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 8d3563c..4266f41 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -325,29 +325,29 @@ public class ActiveRepairService
      *
      * @param parentSession Parent session ID
      * @param neighbors Repair participants (not including self)
-     * @param doAntiCompaction true if repair session needs anti compaction
      * @throws InterruptedException
      * @throws ExecutionException
      */
     public synchronized ListenableFuture<?> finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException
     {
+        // We want to remove parent repair session whether we succeeded or not, so send AnticompactionRequest anyway.
+        // Each replica node determines if anticompaction is needed.
+        List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1);
+        for (InetAddress neighbor : neighbors)
+        {
+            AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, doAntiCompaction);
+            tasks.add(task);
+            task.run(); // 'run' is just sending message
+        }
         if (doAntiCompaction)
         {
-            List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1);
-            for (InetAddress neighbor : neighbors)
-            {
-                AnticompactionTask task = new AnticompactionTask(parentSession, neighbor);
-                tasks.add(task);
-                task.run(); // 'run' is just sending message
-            }
             tasks.add(doAntiCompaction(parentSession));
-            return Futures.successfulAsList(tasks);
         }
         else
         {
             removeParentRepairSession(parentSession);
-            return Futures.immediateFuture(null);
         }
+        return Futures.successfulAsList(tasks);
     }
 
     public ParentRepairSession getParentRepairSession(UUID parentSessionId)