You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2016/11/10 10:11:00 UTC

[02/10] cassandra git commit: Make IndexSummaryManagerTest::testCancelIndex more deterministic

Make IndexSummaryManagerTest::testCancelIndex more deterministic

Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
CASSANDRA-12808


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ca85bec1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ca85bec1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ca85bec1

Branch: refs/heads/cassandra-3.0
Commit: ca85bec1df69140485eb956cb61be9b68be708e0
Parents: 3de6e9d
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed Nov 9 15:49:16 2016 +0000
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Thu Nov 10 09:56:31 2016 +0000

----------------------------------------------------------------------
 .../io/sstable/IndexSummaryManager.java         | 14 +++--
 .../io/sstable/IndexSummaryManagerTest.java     | 58 ++++++++++++++++++--
 2 files changed, 60 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca85bec1/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 4438dc1..0e9073f 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -225,7 +225,9 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
         Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
         try
         {
-            redistributeSummaries(compactingAndNonCompacting.left, compactingAndNonCompacting.right, this.memoryPoolBytes);
+            redistributeSummaries(new IndexSummaryRedistribution(compactingAndNonCompacting.left,
+                                                                 compactingAndNonCompacting.right,
+                                                                 this.memoryPoolBytes));
         }
         finally
         {
@@ -237,14 +239,14 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
     /**
      * Attempts to fairly distribute a fixed pool of memory for index summaries across a set of SSTables based on
      * their recent read rates.
-     * @param transactions containing the sstables we are to redistribute the memory pool across
-     * @param memoryPoolBytes a size (in bytes) that the total index summary space usage should stay close to or
-     *                        under, if possible
+     * @param redistribution encapsulating the transactions containing the sstables we are to redistribute the
+     *                       memory pool across and a size (in bytes) that the total index summary space usage
+     *                       should stay close to or under, if possible
      * @return a list of new SSTableReader instances
      */
     @VisibleForTesting
-    public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException
+    public static List<SSTableReader> redistributeSummaries(IndexSummaryRedistribution redistribution) throws IOException
     {
-        return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes));
+        return CompactionManager.instance.runIndexSummaryRedistribution(redistribution);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca85bec1/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 6935680..a1c0e77 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -56,7 +56,6 @@ import static java.util.Arrays.asList;
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
 import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.DOWNSAMPLE_THESHOLD;
 import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.UPSAMPLE_THRESHOLD;
-import static org.apache.cassandra.io.sstable.IndexSummaryManager.redistributeSummaries;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -90,8 +89,7 @@ public class IndexSummaryManagerTest
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDRACE)
                                                 .minIndexInterval(8)
                                                 .maxIndexInterval(256)
-                                                .caching(CachingOptions.NONE)
-                                    );
+                                                .caching(CachingOptions.NONE));
     }
 
     @Before
@@ -109,7 +107,7 @@ public class IndexSummaryManagerTest
     @After
     public void afterTest()
     {
-        for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions())
+        for (CompactionInfo.Holder holder : CompactionMetrics.getCompactions())
         {
             holder.stop();
         }
@@ -193,7 +191,8 @@ public class IndexSummaryManagerTest
             try
             {
                 future.get();
-            } catch (InterruptedException | ExecutionException e)
+            }
+            catch (InterruptedException | ExecutionException e)
             {
                 throw new RuntimeException(e);
             }
@@ -610,6 +609,8 @@ public class IndexSummaryManagerTest
 
         // everything should get cut in half
         final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
+        // barrier to control when redistribution runs
+        final CountDownLatch barrier = new CountDownLatch(1);
 
         Thread t = new Thread(new Runnable()
         {
@@ -620,7 +621,10 @@ public class IndexSummaryManagerTest
                     // Don't leave enough space for even the minimal index summaries
                     try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
                     {
-                        redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace);
+                        IndexSummaryManager.redistributeSummaries(new ObservableRedistribution(Collections.EMPTY_LIST,
+                                                                                               of(cfs.metadata.cfId, txn),
+                                                                                               singleSummaryOffHeapSpace,
+                                                                                               barrier));
                     }
                 }
                 catch (CompactionInterruptedException ex)
@@ -635,7 +639,12 @@ public class IndexSummaryManagerTest
         t.start();
         while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
             Thread.yield();
+        // to ensure that the stop condition check in IndexSummaryRedistribution::redistributeSummaries
+        // is made *after* the halt request is made to the CompactionManager, don't allow the redistribution
+        // to proceed until stopCompaction has been called.
         CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+        // allows the redistribution to proceed
+        barrier.countDown();
         t.join();
 
         assertNotNull("Expected compaction interrupted exception", exception.get());
@@ -650,4 +659,41 @@ public class IndexSummaryManagerTest
 
         validateData(cfs, numRows);
     }
+
+    private static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting,
+                                                             Map<UUID, LifecycleTransaction> transactions,
+                                                             long memoryPoolBytes)
+    throws IOException
+    {
+        return IndexSummaryManager.redistributeSummaries(new IndexSummaryRedistribution(compacting,
+                                                                                        transactions,
+                                                                                        memoryPoolBytes));
+    }
+
+    private static class ObservableRedistribution extends IndexSummaryRedistribution
+    {
+        CountDownLatch barrier;
+
+        ObservableRedistribution(List<SSTableReader> compacting,
+                                 Map<UUID, LifecycleTransaction> transactions,
+                                 long memoryPoolBytes,
+                                 CountDownLatch barrier)
+        {
+            super(compacting, transactions, memoryPoolBytes);
+            this.barrier = barrier;
+        }
+
+        public List<SSTableReader> redistributeSummaries() throws IOException
+        {
+            try
+            {
+                barrier.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException("Interrupted waiting on test barrier");
+            }
+            return super.redistributeSummaries();
+        }
+    }
 }