You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2016/11/10 10:11:01 UTC
[03/10] cassandra git commit: Make
IndexSummaryManagerTest::testCancelIndex more deterministic
Make IndexSummaryManagerTest::testCancelIndex more deterministic
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
CASSANDRA-12808
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ca85bec1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ca85bec1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ca85bec1
Branch: refs/heads/cassandra-3.X
Commit: ca85bec1df69140485eb956cb61be9b68be708e0
Parents: 3de6e9d
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed Nov 9 15:49:16 2016 +0000
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Thu Nov 10 09:56:31 2016 +0000
----------------------------------------------------------------------
.../io/sstable/IndexSummaryManager.java | 14 +++--
.../io/sstable/IndexSummaryManagerTest.java | 58 ++++++++++++++++++--
2 files changed, 60 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca85bec1/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 4438dc1..0e9073f 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -225,7 +225,9 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
Pair<List<SSTableReader>, Map<UUID, LifecycleTransaction>> compactingAndNonCompacting = getCompactingAndNonCompactingSSTables();
try
{
- redistributeSummaries(compactingAndNonCompacting.left, compactingAndNonCompacting.right, this.memoryPoolBytes);
+ redistributeSummaries(new IndexSummaryRedistribution(compactingAndNonCompacting.left,
+ compactingAndNonCompacting.right,
+ this.memoryPoolBytes));
}
finally
{
@@ -237,14 +239,14 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
/**
* Attempts to fairly distribute a fixed pool of memory for index summaries across a set of SSTables based on
* their recent read rates.
- * @param transactions containing the sstables we are to redistribute the memory pool across
- * @param memoryPoolBytes a size (in bytes) that the total index summary space usage should stay close to or
- * under, if possible
+ * @param redistribution encapsulating the transactions containing the sstables we are to redistribute the
+ * memory pool across and a size (in bytes) that the total index summary space usage
+ * should stay close to or under, if possible
* @return a list of new SSTableReader instances
*/
@VisibleForTesting
- public static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting, Map<UUID, LifecycleTransaction> transactions, long memoryPoolBytes) throws IOException
+ public static List<SSTableReader> redistributeSummaries(IndexSummaryRedistribution redistribution) throws IOException
{
- return CompactionManager.instance.runIndexSummaryRedistribution(new IndexSummaryRedistribution(compacting, transactions, memoryPoolBytes));
+ return CompactionManager.instance.runIndexSummaryRedistribution(redistribution);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ca85bec1/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 6935680..a1c0e77 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -56,7 +56,6 @@ import static java.util.Arrays.asList;
import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.DOWNSAMPLE_THESHOLD;
import static org.apache.cassandra.io.sstable.IndexSummaryRedistribution.UPSAMPLE_THRESHOLD;
-import static org.apache.cassandra.io.sstable.IndexSummaryManager.redistributeSummaries;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -90,8 +89,7 @@ public class IndexSummaryManagerTest
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDRACE)
.minIndexInterval(8)
.maxIndexInterval(256)
- .caching(CachingOptions.NONE)
- );
+ .caching(CachingOptions.NONE));
}
@Before
@@ -109,7 +107,7 @@ public class IndexSummaryManagerTest
@After
public void afterTest()
{
- for (CompactionInfo.Holder holder: CompactionMetrics.getCompactions())
+ for (CompactionInfo.Holder holder : CompactionMetrics.getCompactions())
{
holder.stop();
}
@@ -193,7 +191,8 @@ public class IndexSummaryManagerTest
try
{
future.get();
- } catch (InterruptedException | ExecutionException e)
+ }
+ catch (InterruptedException | ExecutionException e)
{
throw new RuntimeException(e);
}
@@ -610,6 +609,8 @@ public class IndexSummaryManagerTest
// everything should get cut in half
final AtomicReference<CompactionInterruptedException> exception = new AtomicReference<>();
+ // barrier to control when redistribution runs
+ final CountDownLatch barrier = new CountDownLatch(1);
Thread t = new Thread(new Runnable()
{
@@ -620,7 +621,10 @@ public class IndexSummaryManagerTest
// Don't leave enough space for even the minimal index summaries
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
{
- redistributeSummaries(Collections.EMPTY_LIST, of(cfs.metadata.cfId, txn), singleSummaryOffHeapSpace);
+ IndexSummaryManager.redistributeSummaries(new ObservableRedistribution(Collections.EMPTY_LIST,
+ of(cfs.metadata.cfId, txn),
+ singleSummaryOffHeapSpace,
+ barrier));
}
}
catch (CompactionInterruptedException ex)
@@ -635,7 +639,12 @@ public class IndexSummaryManagerTest
t.start();
while (CompactionManager.instance.getActiveCompactions() == 0 && t.isAlive())
Thread.yield();
+ // to ensure that the stop condition check in IndexSummaryRedistribution::redistributeSummaries
+ // is made *after* the halt request is made to the CompactionManager, don't allow the redistribution
+ // to proceed until stopCompaction has been called.
CompactionManager.instance.stopCompaction("INDEX_SUMMARY");
+ // allows the redistribution to proceed
+ barrier.countDown();
t.join();
assertNotNull("Expected compaction interrupted exception", exception.get());
@@ -650,4 +659,41 @@ public class IndexSummaryManagerTest
validateData(cfs, numRows);
}
+
+ private static List<SSTableReader> redistributeSummaries(List<SSTableReader> compacting,
+ Map<UUID, LifecycleTransaction> transactions,
+ long memoryPoolBytes)
+ throws IOException
+ {
+ return IndexSummaryManager.redistributeSummaries(new IndexSummaryRedistribution(compacting,
+ transactions,
+ memoryPoolBytes));
+ }
+
+ private static class ObservableRedistribution extends IndexSummaryRedistribution
+ {
+ CountDownLatch barrier;
+
+ ObservableRedistribution(List<SSTableReader> compacting,
+ Map<UUID, LifecycleTransaction> transactions,
+ long memoryPoolBytes,
+ CountDownLatch barrier)
+ {
+ super(compacting, transactions, memoryPoolBytes);
+ this.barrier = barrier;
+ }
+
+ public List<SSTableReader> redistributeSummaries() throws IOException
+ {
+ try
+ {
+ barrier.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException("Interrupted waiting on test barrier");
+ }
+ return super.redistributeSummaries();
+ }
+ }
}