You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/08/25 19:07:21 UTC

cassandra git commit: Avoid anticompaction after non-global repairs

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 134bcda0c -> 842f1509d


Avoid anticompaction after non-global repairs

Patch by marcuse; reviewed by sankalp kohli for CASSANDRA-9142


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/842f1509
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/842f1509
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/842f1509

Branch: refs/heads/cassandra-2.2
Commit: 842f1509d46bf068abca1e064f23454892347e60
Parents: 134bcda
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jun 1 15:36:17 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Aug 25 19:00:12 2015 +0200

----------------------------------------------------------------------
 .../repair/RepairMessageVerbHandler.java        | 14 +++++-
 .../apache/cassandra/repair/RepairRunnable.java |  2 +-
 .../repair/messages/PrepareMessage.java         |  7 +--
 .../repair/messages/RepairMessage.java          |  1 +
 .../cassandra/repair/messages/RepairOption.java |  4 ++
 .../cassandra/service/ActiveRepairService.java  | 52 ++++++++++++++------
 .../LeveledCompactionStrategyTest.java          |  2 +-
 .../cassandra/repair/LocalSyncTaskTest.java     |  2 +-
 8 files changed, 60 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index c0855c4..796f135 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.LocalPartitioner;
@@ -41,6 +42,7 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -59,6 +61,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
         {
             switch (message.payload.messageType)
             {
+                case PREPARE_GLOBAL_MESSAGE:
                 case PREPARE_MESSAGE:
                     PrepareMessage prepareMessage = (PrepareMessage) message.payload;
                     logger.debug("Preparing, {}", prepareMessage);
@@ -69,10 +72,17 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                         ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
                         columnFamilyStores.add(columnFamilyStore);
                     }
+                    CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(message.from);
+                    // note that we default isGlobal to true since old version always default to true:
+                    boolean isGlobal = peerVersion == null ||
+                                       peerVersion.compareTo(ActiveRepairService.SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) < 0 ||
+                                       message.payload.messageType.equals(RepairMessage.Type.PREPARE_GLOBAL_MESSAGE);
+                    logger.debug("Received prepare message: global message = {}, peerVersion = {},", message.payload.messageType.equals(RepairMessage.Type.PREPARE_GLOBAL_MESSAGE), peerVersion);
                     ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
                             columnFamilyStores,
                             prepareMessage.ranges,
-                            prepareMessage.isIncremental);
+                            prepareMessage.isIncremental,
+                            isGlobal);
                     MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
                     break;
 
@@ -117,7 +127,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     logger.debug("Syncing {}", request);
                     long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
                     if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null)
-                        repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
+                        repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).getRepairedAt();
 
                     StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt);
                     task.run();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 28511db..91ac82a 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -191,7 +191,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         try
         {
             ActiveRepairService.instance.prepareForRepair(parentSession, allNeighbors, options, columnFamilyStores);
-            repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt;
+            repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt();
             progress.incrementAndGet();
         }
         catch (Throwable t)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 37dc07c..a57c27e 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -41,9 +41,9 @@ public class PrepareMessage extends RepairMessage
     public final UUID parentRepairSession;
     public final boolean isIncremental;
 
-    public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>> ranges, boolean isIncremental)
+    public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal)
     {
-        super(Type.PREPARE_MESSAGE, null);
+        super(isGlobal ? Type.PREPARE_GLOBAL_MESSAGE : Type.PREPARE_MESSAGE, null);
         this.parentRepairSession = parentRepairSession;
         this.cfIds = cfIds;
         this.ranges = ranges;
@@ -79,7 +79,8 @@ public class PrepareMessage extends RepairMessage
             for (int i = 0; i < rangeCount; i++)
                 ranges.add((Range<Token>) Range.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version));
             boolean isIncremental = in.readBoolean();
-            return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental);
+
+            return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental, false);
         }
 
         public long serializedSize(PrepareMessage message, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index 6b5226d..d78c2fd 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -45,6 +45,7 @@ public abstract class RepairMessage
         SYNC_COMPLETE(3, SyncComplete.serializer),
         ANTICOMPACTION_REQUEST(4, AnticompactionRequest.serializer),
         PREPARE_MESSAGE(5, PrepareMessage.serializer),
+        PREPARE_GLOBAL_MESSAGE(8, PrepareMessage.serializer),
         SNAPSHOT(6, SnapshotMessage.serializer),
         CLEANUP(7, CleanupMessage.serializer);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index 7b9a9af..f3e452c 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -287,6 +287,10 @@ public class RepairOption
         return hosts;
     }
 
+    public boolean isGlobal()
+    {
+        return dataCenters.isEmpty() && hosts.isEmpty();
+    }
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 213edeb..a6389ea 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -54,6 +55,7 @@ import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.repair.RepairSession;
 import org.apache.cassandra.repair.messages.*;
+import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.Ref;
@@ -75,6 +77,8 @@ import org.apache.cassandra.utils.concurrent.Refs;
  */
 public class ActiveRepairService
 {
+    public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1");
+
     private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
     // singleton enforcement
     public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance);
@@ -233,7 +237,7 @@ public class ActiveRepairService
 
     public synchronized UUID prepareForRepair(UUID parentRepairSession, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores)
     {
-        registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental());
+        registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental(), options.isGlobal());
         final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
         final AtomicBoolean status = new AtomicBoolean(true);
         final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>());
@@ -263,7 +267,10 @@ public class ActiveRepairService
 
         for (InetAddress neighbour : endpoints)
         {
-            PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental());
+            CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbour);
+            boolean isGlobal = options.isGlobal() && peerVersion != null && peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0;
+            logger.debug("Sending prepare message: options.isGlobal = {}, peerVersion = {}", options.isGlobal(), peerVersion);
+            PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), isGlobal);
             MessageOut<RepairMessage> msg = message.createMessage();
             MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
         }
@@ -286,9 +293,9 @@ public class ActiveRepairService
         return parentRepairSession;
     }
 
-    public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental)
+    public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal)
     {
-        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, System.currentTimeMillis()));
+        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, isGlobal, System.currentTimeMillis()));
     }
 
     public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession)
@@ -313,15 +320,15 @@ public class ActiveRepairService
      */
     public synchronized ListenableFuture finishParentSession(UUID parentSession, Set<InetAddress> neighbors, Collection<Range<Token>> successfulRanges)
     {
-            List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1);
-            for (InetAddress neighbor : neighbors)
-            {
-                AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
-                tasks.add(task);
-                task.run(); // 'run' is just sending message
-            }
-            tasks.add(doAntiCompaction(parentSession, successfulRanges));
-            return Futures.successfulAsList(tasks);
+        List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1);
+        for (InetAddress neighbor : neighbors)
+        {
+            AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
+            tasks.add(task);
+            task.run(); // 'run' is just sending message
+        }
+        tasks.add(doAntiCompaction(parentSession, successfulRanges));
+        return Futures.successfulAsList(tasks);
     }
 
     public ParentRepairSession getParentRepairSession(UUID parentSessionId)
@@ -346,6 +353,12 @@ public class ActiveRepairService
     {
         assert parentRepairSession != null;
         ParentRepairSession prs = getParentRepairSession(parentRepairSession);
+        if (!prs.isGlobal)
+        {
+            logger.info("Not a global repair, will not do anticompaction");
+            removeParentRepairSession(parentRepairSession);
+            return Futures.immediateFuture(Collections.emptyList());
+        }
         assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges";
 
         List<ListenableFuture<?>> futures = new ArrayList<>();
@@ -400,15 +413,17 @@ public class ActiveRepairService
         private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
         private final Collection<Range<Token>> ranges;
         private final Map<UUID, Set<SSTableReader>> sstableMap = new HashMap<>();
-        public final long repairedAt;
+        private final long repairedAt;
         public final boolean isIncremental;
+        private final boolean isGlobal;
 
-        public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt)
+        public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt)
         {
             for (ColumnFamilyStore cfs : columnFamilyStores)
                 this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
             this.ranges = ranges;
             this.repairedAt = repairedAt;
+            this.isGlobal = isGlobal;
             this.isIncremental = isIncremental;
         }
 
@@ -445,7 +460,12 @@ public class ActiveRepairService
             }
             return new Refs<>(references.build());
         }
-
+        public long getRepairedAt()
+        {
+            if (isGlobal)
+                return repairedAt;
+            return ActiveRepairService.UNREPAIRED_SSTABLE;
+        }
         @Override
         public String toString()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 03aaf03..63fd0e7 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -197,7 +197,7 @@ public class LeveledCompactionStrategyTest
         Range<Token> range = new Range<>(Util.token(""), Util.token(""));
         int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(System.currentTimeMillis());
         UUID parentRepSession = UUID.randomUUID();
-        ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false);
+        ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false, true);
         RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range);
         Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
         CompactionManager.instance.submitValidation(cfs, validator).get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 3a16262..e5c03b9 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -89,7 +89,7 @@ public class LocalSyncTaskTest extends SchemaLoader
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
 
-        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false);
+        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false, false);
 
         RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range);