You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/08/25 18:21:22 UTC

cassandra git commit: Fix flaky test failure: SSTableLoaderTest

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 65ee15f98 -> 8afc76ae6


Fix flaky test failure: SSTableLoaderTest

Patch by blambov; reviewed by jmckenzie for CASSANDRA-10118


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8afc76ae
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8afc76ae
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8afc76ae

Branch: refs/heads/cassandra-3.0
Commit: 8afc76ae63cd97fb7188653b6e58e4b2149f5d77
Parents: 65ee15f
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Aug 25 12:20:08 2015 -0400
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Tue Aug 25 12:20:08 2015 -0400

----------------------------------------------------------------------
 .../cassandra/io/sstable/SSTableLoaderTest.java | 39 ++++++++++++++++++--
 1 file changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8afc76ae/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index dfd7821..3370e56 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -18,9 +18,12 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 import com.google.common.io.Files;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -38,6 +41,9 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.OutputHandler;
@@ -113,8 +119,9 @@ public class SSTableLoaderTest
             writer.addRow("key1", "col1", "100");
         }
 
+        final CountDownLatch latch = new CountDownLatch(1);
         SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
-        loader.stream().get();
+        loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
         List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1)).build());
 
@@ -123,6 +130,10 @@ public class SSTableLoaderTest
         assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(new Clustering(ByteBufferUtil.bytes("col1")))
                                                                    .getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val")))
                                                                    .value());
+
+        // The stream future is signalled when the work is complete but before releasing references. Wait for release
+        // before cleanup (CASSANDRA-10118).
+        latch.await();
     }
 
     @Test
@@ -153,9 +164,10 @@ public class SSTableLoaderTest
         //make sure we have some tables...
         assertTrue(dataDir.listFiles().length > 0);
 
+        final CountDownLatch latch = new CountDownLatch(2);
         //writer is still open so loader should not load anything
         SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
-        loader.stream().get();
+        loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
         List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
 
@@ -165,9 +177,30 @@ public class SSTableLoaderTest
         writer.close();
 
         loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false));
-        loader.stream().get();
+        loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
         partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
         assertEquals(1000, partitions.size());
+
+        // The stream future is signalled when the work is complete but before releasing references. Wait for release
+        // before cleanup (CASSANDRA-10118).
+        latch.await();
+    }
+
+    StreamEventHandler completionStreamListener(final CountDownLatch latch)
+    {
+        return new StreamEventHandler() {
+            public void onFailure(Throwable arg0)
+            {
+                latch.countDown();
+            }
+
+            public void onSuccess(StreamState arg0)
+            {
+                latch.countDown();
+            }
+
+            public void handleStreamEvent(StreamEvent event) {}
+        };
     }
 }