You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2017/08/18 17:49:23 UTC
cassandra git commit: Don't delete incremental repair sessions if
they still have sstables
Repository: cassandra
Updated Branches:
refs/heads/trunk c066f126e -> e1a1b80d4
Don't delete incremental repair sessions if they still have sstables
Patch by Blake Eggleston; reviewed by Marcus Eriksson for CASSANDRA-13758
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e1a1b80d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e1a1b80d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e1a1b80d
Branch: refs/heads/trunk
Commit: e1a1b80d424e31eeb5805c710ce010953160e3a4
Parents: c066f12
Author: Blake Eggleston <bd...@gmail.com>
Authored: Fri Aug 11 14:23:22 2017 -0700
Committer: Blake Eggleston <bd...@gmail.com>
Committed: Fri Aug 18 10:48:00 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compaction/CompactionStrategyManager.java | 5 +++
.../db/compaction/PendingRepairManager.java | 5 +++
.../repair/consistent/LocalSessions.java | 33 ++++++++++++----
.../db/compaction/PendingRepairManagerTest.java | 15 +++++++
.../repair/consistent/LocalSessionTest.java | 41 +++++++++++++++++++-
6 files changed, 90 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1a1b80d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e015a0b..a02a7bf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758)
* Fix pending repair manager index out of bounds check (CASSANDRA-13769)
* Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576)
* Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1a1b80d/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 6342a1b..3b1bc41 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -339,6 +339,11 @@ public class CompactionStrategyManager implements INotificationConsumer
return ids;
}
+ public boolean hasDataForPendingRepair(UUID sessionID)
+ {
+ return Iterables.any(pendingRepairs, prm -> prm.hasDataForSession(sessionID));
+ }
+
public void shutdown()
{
writeLock.lock();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1a1b80d/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index 4596381..8ee6025 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -398,6 +398,11 @@ class PendingRepairManager
return strategies.values().contains(strategy);
}
+ public synchronized boolean hasDataForSession(UUID sessionID)
+ {
+ return strategies.keySet().contains(sessionID);
+ }
+
public Collection<AbstractCompactionTask> createUserDefinedTasks(List<SSTableReader> sstables, int gcBefore)
{
Map<UUID, List<SSTableReader>> group = sstables.stream().collect(Collectors.groupingBy(s -> s.getSSTableMetadata().pendingRepair));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1a1b80d/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index a25f65c..4ef2c2c 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -32,7 +32,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-
+import java.util.function.Predicate;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
@@ -45,18 +45,15 @@ import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.InetAddressType;
import org.apache.cassandra.db.marshal.UUIDType;
@@ -77,6 +74,8 @@ import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.StatusRequest;
import org.apache.cassandra.repair.messages.StatusResponse;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
@@ -260,8 +259,15 @@ public class LocalSessions
}
else if (shouldDelete(session, now))
{
- logger.debug("Auto deleting repair session {}", session);
- deleteSession(session.sessionID);
+ if (!sessionHasData(session))
+ {
+ logger.debug("Auto deleting repair session {}", session);
+ deleteSession(session.sessionID);
+ }
+ else
+ {
+ logger.warn("Skipping delete of LocalSession {} because it still contains sstables", session.sessionID);
+ }
}
else if (shouldCheckStatus(session, now))
{
@@ -737,6 +743,17 @@ public class LocalSessions
return session != null && session.getState() != FINALIZED && session.getState() != FAILED;
}
+ @VisibleForTesting
+ protected boolean sessionHasData(LocalSession session)
+ {
+ Predicate<TableId> predicate = tid -> {
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid);
+ return cfs != null && cfs.getCompactionStrategyManager().hasDataForPendingRepair(session.sessionID);
+
+ };
+ return Iterables.any(session.tableIds, predicate::test);
+ }
+
/**
* Returns the repairedAt time for a sessions which is unknown, failed, or finalized
* calling this for a session which is in progress throws an exception
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1a1b80d/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
index 33e996b..2b88c9c 100644
--- a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
@@ -285,4 +285,19 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
SSTableReader sstable = makeSSTable(true);
prm.getOrCreate(sstable);
}
+
+ @Test
+ public void sessionHasData()
+ {
+ PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+
+ UUID repairID = registerSession(cfs, true, true);
+ LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
+
+ Assert.assertFalse(prm.hasDataForSession(repairID));
+ SSTableReader sstable = makeSSTable(true);
+ mutateRepaired(sstable, repairID);
+ prm.addSSTable(sstable);
+ Assert.assertTrue(prm.hasDataForSession(repairID));
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1a1b80d/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index c59462e..be048fb 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -182,6 +182,12 @@ public class LocalSessionTest extends AbstractRepairTest
int calls = completedSessions.getOrDefault(sessionID, 0);
completedSessions.put(sessionID, calls + 1);
}
+
+ boolean sessionHasData = false;
+ protected boolean sessionHasData(LocalSession session)
+ {
+ return sessionHasData;
+ }
}
private static TableMetadata cfm;
@@ -865,10 +871,10 @@ public class LocalSessionTest extends AbstractRepairTest
}
/**
- * Sessions past the auto delete cutoff should be deleted
+ * Sessions past the auto delete cutoff with no sstables should be deleted
*/
@Test
- public void cleanupDelete() throws Exception
+ public void cleanupDeleteNoSSTables() throws Exception
{
LocalSessions sessions = new InstrumentedLocalSessions();
sessions.start();
@@ -895,6 +901,37 @@ public class LocalSessionTest extends AbstractRepairTest
}
/**
+ * Sessions past the auto delete cutoff with no sstables should be deleted
+ */
+ @Test
+ public void cleanupDeleteSSTablesRemaining() throws Exception
+ {
+ InstrumentedLocalSessions sessions = new InstrumentedLocalSessions();
+ sessions.start();
+
+ int time = FBUtilities.nowInSeconds() - LocalSessions.AUTO_FAIL_TIMEOUT - 1;
+ LocalSession failed = sessionWithTime(time - 1, time);
+ failed.setState(FAILED);
+
+ LocalSession finalized = sessionWithTime(time - 1, time);
+ finalized.setState(FINALIZED);
+
+ sessions.putSessionUnsafe(failed);
+ sessions.putSessionUnsafe(finalized);
+ Assert.assertNotNull(sessions.getSession(failed.sessionID));
+ Assert.assertNotNull(sessions.getSession(finalized.sessionID));
+
+ sessions.sessionHasData = true;
+ sessions.cleanup();
+
+ Assert.assertNotNull(sessions.getSession(failed.sessionID));
+ Assert.assertNotNull(sessions.getSession(finalized.sessionID));
+
+ Assert.assertNotNull(sessions.loadUnsafe(failed.sessionID));
+ Assert.assertNotNull(sessions.loadUnsafe(finalized.sessionID));
+ }
+
+ /**
* Sessions should start checking the status of their participants if
* there hasn't been activity for the CHECK_STATUS_TIMEOUT period
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org