You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/03/05 08:17:18 UTC

[2/6] cassandra git commit: Fully utilise specified compaction threads (jobs)

Fully utilise specified compaction threads (jobs)

Patch by Kurt Greaves; reviewed by marcuse for CASSANDRA-14210


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f88ec935
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f88ec935
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f88ec935

Branch: refs/heads/cassandra-3.11
Commit: f88ec9357de406daad0f795951f17e5f854ade10
Parents: 79cead0
Author: kurt <ku...@instaclustr.com>
Authored: Mon Feb 12 21:06:34 2018 +0000
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Mar 5 09:09:52 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 11 +++---
 .../io/sstable/format/SSTableReader.java        |  2 +
 .../org/apache/cassandra/utils/FBUtilities.java | 36 ++++++++++++++++++
 .../apache/cassandra/utils/FBUtilitiesTest.java | 39 ++++++++++++++++++++
 5 files changed, 84 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f88ec935/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8cf665e..5599906 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.17
+ * Fully utilise specified compaction threads (CASSANDRA-14210)
  * Pre-create deletion log records to finish compactions quicker (CASSANDRA-12763)
 Merged from 2.1:
  * CVE-2017-5929 Security vulnerability in Logback warning in NEWS.txt (CASSANDRA-14183)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f88ec935/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 1d54667..a8e6931 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -327,8 +327,8 @@ public class CompactionManager implements CompactionManagerMBean
 
                 if (jobs > 0 && futures.size() == jobs)
                 {
-                    FBUtilities.waitOnFutures(futures);
-                    futures.clear();
+                    Future<?> f = FBUtilities.waitOnFirstFuture(futures);
+                    futures.remove(f);
                 }
             }
             FBUtilities.waitOnFutures(futures);
@@ -416,8 +416,9 @@ public class CompactionManager implements CompactionManagerMBean
             @Override
             public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
             {
-                Iterable<SSTableReader> sstables = new ArrayList<>(transaction.originals());
-                Iterator<SSTableReader> iter = sstables.iterator();
+                List<SSTableReader> sortedSSTables = Lists.newArrayList(transaction.originals());
+                Collections.sort(sortedSSTables, SSTableReader.sizeComparator.reversed());
+                Iterator<SSTableReader> iter = sortedSSTables.iterator();
                 while (iter.hasNext())
                 {
                     SSTableReader sstable = iter.next();
@@ -427,7 +428,7 @@ public class CompactionManager implements CompactionManagerMBean
                         iter.remove();
                     }
                 }
-                return sstables;
+                return sortedSSTables;
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f88ec935/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 7e1bc1a..2c94e45 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -152,6 +152,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
     public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
 
+    public static final Comparator<SSTableReader> sizeComparator = (o1, o2) -> Longs.compare(o1.onDiskLength(), o2.onDiskLength());
+
     /**
      * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
      * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f88ec935/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index f111919..268e54b 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -35,6 +35,7 @@ import javax.annotation.Nullable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -397,6 +398,41 @@ public class FBUtilities
             result.get(ms, TimeUnit.MILLISECONDS);
     }
 
+    public static <T> Future<? extends T> waitOnFirstFuture(Iterable<? extends Future<? extends T>> futures)
+    {
+        return waitOnFirstFuture(futures, 100);
+    }
+    /**
+     * Only wait for the first future to finish from a list of futures. Will block until at least 1 future finishes.
+     * @param futures The futures to wait on
+     * @return future that completed.
+     */
+    public static <T> Future<? extends T> waitOnFirstFuture(Iterable<? extends Future<? extends T>> futures, long delay)
+    {
+        while (true)
+        {
+            for (Future<? extends T> f : futures)
+            {
+                if (f.isDone())
+                {
+                    try
+                    {
+                        f.get();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new AssertionError(e);
+                    }
+                    catch (ExecutionException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                    return f;
+                }
+            }
+            Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
+        }
+    }
     /**
      * Create a new instance of a partitioner defined in an SSTable Descriptor
      * @param desc Descriptor of an sstable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f88ec935/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java b/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
index acd68eb..c5126a0 100644
--- a/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
@@ -23,6 +23,15 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.primitives.Ints;
 import org.junit.Test;
@@ -127,4 +136,34 @@ public class FBUtilitiesTest
 
         FBUtilities.reset();
     }
+
+    @Test
+    public void testWaitFirstFuture() throws ExecutionException, InterruptedException
+    {
+
+        ExecutorService executor = Executors.newFixedThreadPool(4);
+        FBUtilities.reset();
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 4; i >= 1; i--)
+        {
+            final int sleep = i * 10;
+            futures.add(executor.submit(() -> { TimeUnit.MILLISECONDS.sleep(sleep); return sleep; }));
+        }
+        Future<?> fut = FBUtilities.waitOnFirstFuture(futures, 3);
+        int futSleep = (Integer) fut.get();
+        assertEquals(futSleep, 10);
+        futures.remove(fut);
+        fut = FBUtilities.waitOnFirstFuture(futures, 3);
+        futSleep = (Integer) fut.get();
+        assertEquals(futSleep, 20);
+        futures.remove(fut);
+        fut = FBUtilities.waitOnFirstFuture(futures, 3);
+        futSleep = (Integer) fut.get();
+        assertEquals(futSleep, 30);
+        futures.remove(fut);
+        fut = FBUtilities.waitOnFirstFuture(futures, 3);
+        futSleep = (Integer) fut.get();
+        assertEquals(futSleep, 40);
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org