You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/05/15 18:02:58 UTC
[2/3] cassandra git commit: Remove parent session on remotes when
repair fails
Remove parent session on remotes when repair fails
Follow up on CASSANDRA-9097
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01087489
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01087489
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01087489
Branch: refs/heads/trunk
Commit: 01087489ed5484a2f7c5e7d55e8adb8bcf51b7a7
Parents: 3476a25
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu May 14 19:07:05 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri May 15 10:54:41 2015 -0500
----------------------------------------------------------------------
.../cassandra/repair/AnticompactionTask.java | 15 ++++-
.../repair/RepairMessageVerbHandler.java | 9 ++-
.../repair/messages/CleanupMessage.java | 61 ++++++++++++++++++++
.../repair/messages/RepairMessage.java | 3 +-
.../cassandra/service/ActiveRepairService.java | 20 +++----
5 files changed, 93 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01087489/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index e505d91..f41d26c 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.AnticompactionRequest;
+import org.apache.cassandra.repair.messages.CleanupMessage;
import org.apache.cassandra.utils.SemanticVersion;
public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
@@ -40,11 +41,13 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
private final UUID parentSession;
private final InetAddress neighbor;
+ private final boolean doAnticompaction;
- public AnticompactionTask(UUID parentSession, InetAddress neighbor)
+ public AnticompactionTask(UUID parentSession, InetAddress neighbor, boolean doAnticompaction)
{
this.parentSession = parentSession;
this.neighbor = neighbor;
+ this.doAnticompaction = doAnticompaction;
}
public void run()
@@ -53,7 +56,15 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
{
- MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+ if (doAnticompaction)
+ {
+ MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+ }
+ else
+ {
+ // we need to clean up parent session
+ MessagingService.instance().sendRR(new CleanupMessage(parentSession).createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+ }
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01087489/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 60b2243..872978e 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.Future;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ListenableFuture;
@@ -44,7 +43,6 @@ import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.*;
import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
/**
@@ -125,6 +123,13 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
}, MoreExecutors.sameThreadExecutor());
break;
+ case CLEANUP:
+ logger.debug("cleaning up repair");
+ CleanupMessage cleanup = (CleanupMessage) message.payload;
+ ActiveRepairService.instance.removeParentRepairSession(cleanup.parentRepairSession);
+ MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
+ break;
+
default:
ActiveRepairService.instance.handleMessage(message.from, message.payload);
break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01087489/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
new file mode 100644
index 0000000..6d702ce
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/CleanupMessage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * Message to cleanup repair resources on replica nodes.
+ *
+ * @since 2.1.6
+ */
+public class CleanupMessage extends RepairMessage
+{
+ public static MessageSerializer serializer = new CleanupMessageSerializer();
+ public final UUID parentRepairSession;
+
+ public CleanupMessage(UUID parentRepairSession)
+ {
+ super(Type.CLEANUP, null);
+ this.parentRepairSession = parentRepairSession;
+ }
+
+ public static class CleanupMessageSerializer implements MessageSerializer<CleanupMessage>
+ {
+ public void serialize(CleanupMessage message, DataOutputPlus out, int version) throws IOException
+ {
+ UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version);
+ }
+
+ public CleanupMessage deserialize(DataInput in, int version) throws IOException
+ {
+ UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version);
+ return new CleanupMessage(parentRepairSession);
+ }
+
+ public long serializedSize(CleanupMessage message, int version)
+ {
+ return UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01087489/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index d500928..b49f1f3 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -45,7 +45,8 @@ public abstract class RepairMessage
SYNC_COMPLETE(3, SyncComplete.serializer),
ANTICOMPACTION_REQUEST(4, AnticompactionRequest.serializer),
PREPARE_MESSAGE(5, PrepareMessage.serializer),
- SNAPSHOT(6, SnapshotMessage.serializer);
+ SNAPSHOT(6, SnapshotMessage.serializer),
+ CLEANUP(7, CleanupMessage.serializer);
private final byte type;
private final MessageSerializer<RepairMessage> serializer;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01087489/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 8d3563c..4266f41 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -325,29 +325,29 @@ public class ActiveRepairService
*
* @param parentSession Parent session ID
* @param neighbors Repair participants (not including self)
- * @param doAntiCompaction true if repair session needs anti compaction
* @throws InterruptedException
* @throws ExecutionException
*/
public synchronized ListenableFuture<?> finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException
{
+ // We want to remove parent repair session whether we succeeded or not, so send AnticompactionRequest anyway.
+ // Each replica node determines if anticompaction is needed.
+ List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1);
+ for (InetAddress neighbor : neighbors)
+ {
+ AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, doAntiCompaction);
+ tasks.add(task);
+ task.run(); // 'run' is just sending message
+ }
if (doAntiCompaction)
{
- List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1);
- for (InetAddress neighbor : neighbors)
- {
- AnticompactionTask task = new AnticompactionTask(parentSession, neighbor);
- tasks.add(task);
- task.run(); // 'run' is just sending message
- }
tasks.add(doAntiCompaction(parentSession));
- return Futures.successfulAsList(tasks);
}
else
{
removeParentRepairSession(parentSession);
- return Futures.immediateFuture(null);
}
+ return Futures.successfulAsList(tasks);
}
public ParentRepairSession getParentRepairSession(UUID parentSessionId)