You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/12/22 22:13:46 UTC

cassandra git commit: Add forceUserDefinedCleanup to allow more flexible cleanup

Repository: cassandra
Updated Branches:
  refs/heads/trunk 3120b8b65 -> cae395026


Add forceUserDefinedCleanup to allow more flexible cleanup

patch by jeffj; reviewed by yukim for CASSANDRA-10708


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cae39502
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cae39502
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cae39502

Branch: refs/heads/trunk
Commit: cae395026af3f82afbbee6d2ab090f985ee006d3
Parents: 3120b8b
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Tue Dec 22 15:11:25 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 22 15:11:25 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 56 ++++++++++++++++++++
 .../db/compaction/CompactionManagerMBean.java   | 11 ++++
 .../org/apache/cassandra/db/CleanupTest.java    | 26 +++++++++
 4 files changed, 94 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cae39502/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b7f389..9f82731 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.2
+ * Add forceUserDefinedCleanup to allow more flexible cleanup (CASSANDRA-10708)
  * (cqlsh) allow setting TTL with COPY (CASSANDRA-9494)
  * Fix EstimatedHistogram creation in nodetool tablehistograms (CASSANDRA-10859)
  * Establish bootstrap stream sessions sequentially (CASSANDRA-6992)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cae39502/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index fafab69..cfffa14 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -609,6 +609,62 @@ public class CompactionManager implements CompactionManagerMBean
         FBUtilities.waitOnFutures(futures);
     }
 
+    public void forceUserDefinedCleanup(String dataFiles)
+    {
+        String[] filenames = dataFiles.split(",");
+        HashMap<ColumnFamilyStore, Descriptor> descriptors = Maps.newHashMap();
+
+        for (String filename : filenames)
+        {
+            // extract keyspace and columnfamily name from filename
+            Descriptor desc = Descriptor.fromFilename(filename.trim());
+            if (Schema.instance.getCFMetaData(desc) == null)
+            {
+                logger.warn("Schema does not exist for file {}. Skipping.", filename);
+                continue;
+            }
+            // group by keyspace/columnfamily
+            ColumnFamilyStore cfs = Keyspace.open(desc.ksname).getColumnFamilyStore(desc.cfname);
+            desc = cfs.getDirectories().find(new File(filename.trim()).getName());
+            if (desc != null)
+                descriptors.put(cfs, desc);
+        }
+
+        for (Map.Entry<ColumnFamilyStore,Descriptor> entry : descriptors.entrySet())
+        {
+            ColumnFamilyStore cfs = entry.getKey();
+            Keyspace keyspace = cfs.keyspace;
+            Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
+            boolean hasIndexes = cfs.indexManager.hasIndexes();
+            SSTableReader sstable = lookupSSTable(cfs, entry.getValue());
+
+            if (ranges.isEmpty())
+            {
+                logger.error("Cleanup cannot run before a node has joined the ring");
+                return;
+            }
+
+            if(sstable == null)
+            {
+                logger.warn("Will not clean {}, it is not an active sstable", entry.getValue());
+            }
+            else
+            {
+                LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.CLEANUP);
+                CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, FBUtilities.nowInSeconds());
+                try
+                {
+                    doCleanupOne(cfs, txn, cleanupStrategy, ranges, hasIndexes);
+                }
+                catch (IOException e)
+                {
+                    logger.error(String.format("forceUserDefinedCleanup failed: %s", e.getLocalizedMessage()));
+                }
+            }
+        }
+    }
+
+
     public Future<?> submitUserDefined(final ColumnFamilyStore cfs, final Collection<Descriptor> dataFiles, final int gcBefore)
     {
         Runnable runnable = new WrappedRunnable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cae39502/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
index d5da0fe..bb67d5f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
@@ -45,6 +45,17 @@ public interface CompactionManagerMBean
     public void forceUserDefinedCompaction(String dataFiles);
 
     /**
+     * Triggers the cleanup of user specified sstables.
+     * You can specify files from various keyspaces and columnfamilies.
+     * If you do so, cleanup is performed each file individually
+     *
+     * @param dataFiles a comma separated list of sstable file to cleanup.
+     *                  must contain keyspace and columnfamily name in path(for 2.1+) or file name itself.
+     */
+    public void forceUserDefinedCleanup(String dataFiles);
+
+
+    /**
      * Stop all running compaction-like tasks having the provided {@code type}.
      * @param type the type of compaction to stop. Can be one of:
      *   - COMPACTION

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cae39502/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 167f3b0..c15bdb4 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -174,6 +174,32 @@ public class CleanupTest
     }
 
     @Test
+    public void testuserDefinedCleanupWithNewToken() throws ExecutionException, InterruptedException, UnknownHostException
+    {
+        StorageService.instance.getTokenMetadata().clearUnsafe();
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+
+        // insert data and verify we get it back w/ range query
+        fillCF(cfs, "val", LOOPS);
+
+        assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+        byte[] tk1 = new byte[1], tk2 = new byte[1];
+        tk1[0] = 2;
+        tk2[0] = 1;
+        tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
+        tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
+
+        for(SSTableReader r: cfs.getLiveSSTables())
+            CompactionManager.instance.forceUserDefinedCleanup(r.getFilename());
+
+        assertEquals(0, Util.getAll(Util.cmd(cfs).build()).size());
+    }
+
+    @Test
     public void testNeedsCleanup() throws Exception
     {
         // setup