You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/08/25 18:21:22 UTC
cassandra git commit: Fix flaky test failure: SSTableLoaderTest
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 65ee15f98 -> 8afc76ae6
Fix flaky test failure: SSTableLoaderTest
Patch by blambov; reviewed by jmckenzie for CASSANDRA-10118
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8afc76ae
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8afc76ae
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8afc76ae
Branch: refs/heads/cassandra-3.0
Commit: 8afc76ae63cd97fb7188653b6e58e4b2149f5d77
Parents: 65ee15f
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Aug 25 12:20:08 2015 -0400
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Tue Aug 25 12:20:08 2015 -0400
----------------------------------------------------------------------
.../cassandra/io/sstable/SSTableLoaderTest.java | 39 ++++++++++++++++++--
1 file changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8afc76ae/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index dfd7821..3370e56 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -18,9 +18,12 @@
package org.apache.cassandra.io.sstable;
import java.io.File;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import com.google.common.io.Files;
+
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -38,6 +41,9 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;
@@ -113,8 +119,9 @@ public class SSTableLoaderTest
writer.addRow("key1", "col1", "100");
}
+ final CountDownLatch latch = new CountDownLatch(1);
SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
- loader.stream().get();
+ loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1)).build());
@@ -123,6 +130,10 @@ public class SSTableLoaderTest
assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(new Clustering(ByteBufferUtil.bytes("col1")))
.getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val")))
.value());
+
+ // The stream future is signalled when the work is complete but before releasing references. Wait for release
+ // before cleanup (CASSANDRA-10118).
+ latch.await();
}
@Test
@@ -153,9 +164,10 @@ public class SSTableLoaderTest
//make sure we have some tables...
assertTrue(dataDir.listFiles().length > 0);
+ final CountDownLatch latch = new CountDownLatch(2);
//writer is still open so loader should not load anything
SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
- loader.stream().get();
+ loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
@@ -165,9 +177,30 @@ public class SSTableLoaderTest
writer.close();
loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
- loader.stream().get();
+ loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
assertEquals(1000, partitions.size());
+
+ // The stream future is signalled when the work is complete but before releasing references. Wait for release
+ // before cleanup (CASSANDRA-10118).
+ latch.await();
+ }
+
+ StreamEventHandler completionStreamListener(final CountDownLatch latch)
+ {
+ return new StreamEventHandler() {
+ public void onFailure(Throwable arg0)
+ {
+ latch.countDown();
+ }
+
+ public void onSuccess(StreamState arg0)
+ {
+ latch.countDown();
+ }
+
+ public void handleStreamEvent(StreamEvent event) {}
+ };
}
}