You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/06 14:59:54 UTC

[01/10] cassandra git commit: Fix WaitQueueTest flakiness

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 20f12e974 -> e1bb79260
  refs/heads/cassandra-2.2 32bc8b0b1 -> 7636a6b86
  refs/heads/cassandra-3.0 ace28c928 -> bf4740867
  refs/heads/trunk 9cd4f8293 -> fe8fbc8f1


Fix WaitQueueTest flakiness


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e1bb7926
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e1bb7926
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e1bb7926

Branch: refs/heads/cassandra-2.1
Commit: e1bb79260a9bdf478895724ea180cf0c2efb37ff
Parents: 20f12e9
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Aug 6 14:45:54 2015 +0200
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Aug 6 14:45:54 2015 +0200

----------------------------------------------------------------------
 test/unit/org/apache/cassandra/Util.java        |  5 ++
 .../cassandra/concurrent/WaitQueueTest.java     | 91 ++++++--------------
 2 files changed, 32 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1bb7926/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 1015be6..e05468f 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -376,4 +376,9 @@ public class Util
         Composite endName = CellNames.simpleDense(ByteBufferUtil.bytes(finish));
         return new RangeTombstone(startName, endName, timestamp , localtime);
     }
+
+    public static void joinThread(Thread thread) throws InterruptedException
+    {
+        thread.join(10000);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1bb7926/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
index 3e7cb7b..fdc6880 100644
--- a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
@@ -21,10 +21,13 @@ package org.apache.cassandra.concurrent;
  */
 
 
+import org.apache.cassandra.Util;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.junit.*;
 
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.*;
 
@@ -38,6 +41,7 @@ public class WaitQueueTest
     }
     public void testSerial(final WaitQueue queue) throws InterruptedException
     {
+        final AtomicInteger ready = new AtomicInteger();
         Thread[] ts = new Thread[4];
         for (int i = 0 ; i < ts.length ; i++)
             ts[i] = new Thread(new Runnable()
@@ -46,6 +50,7 @@ public class WaitQueueTest
             public void run()
             {
                 WaitQueue.Signal wait = queue.register();
+                ready.incrementAndGet();
                 try
                 {
                     wait.await();
@@ -55,68 +60,28 @@ public class WaitQueueTest
                 }
             }
         });
-        for (int i = 0 ; i < ts.length ; i++)
-            ts[i].start();
-        Thread.sleep(100);
-        queue.signal();
-        queue.signal();
-        queue.signal();
-        queue.signal();
-        for (int i = 0 ; i < ts.length ; i++)
+        for (Thread t : ts)
+            t.start();
+        final ThreadLocalRandom random = ThreadLocalRandom.current();
+        while (ready.get() < ts.length)
+            random.nextLong();
+        for (Thread t : ts)
+            queue.signal();
+        for (Thread t : ts)
         {
-            ts[i].join(100);
-            assertFalse(queue.getClass().getName(), ts[i].isAlive());
+            Util.joinThread(t);
+            assertFalse(queue.getClass().getName(), t.isAlive());
         }
     }
 
-
-    @Test
-    public void testCondition1() throws InterruptedException
-    {
-        testCondition1(new WaitQueue());
-    }
-
-    public void testCondition1(final WaitQueue queue) throws InterruptedException
-    {
-        final AtomicBoolean cond1 = new AtomicBoolean(false);
-        final AtomicBoolean fail = new AtomicBoolean(false);
-        Thread t1 = new Thread(new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                try
-                {
-                    Thread.sleep(200);
-                } catch (InterruptedException e)
-                {
-                    e.printStackTrace();
-                }
-                WaitQueue.Signal wait = queue.register();
-                if (!cond1.get())
-                {
-                    System.err.println("Condition should have already been met");
-                    fail.set(true);
-                }
-            }
-        });
-        t1.start();
-        Thread.sleep(50);
-        cond1.set(true);
-        Thread.sleep(300);
-        queue.signal();
-        t1.join(300);
-        assertFalse(queue.getClass().getName(), t1.isAlive());
-        assertFalse(fail.get());
-    }
-
     @Test
-    public void testCondition2() throws InterruptedException
+    public void testCondition() throws InterruptedException
     {
-        testCondition2(new WaitQueue());
+        testCondition(new WaitQueue());
     }
-    public void testCondition2(final WaitQueue queue) throws InterruptedException
+    public void testCondition(final WaitQueue queue) throws InterruptedException
     {
+        final AtomicBoolean ready = new AtomicBoolean(false);
         final AtomicBoolean condition = new AtomicBoolean(false);
         final AtomicBoolean fail = new AtomicBoolean(false);
         Thread t = new Thread(new Runnable()
@@ -129,16 +94,12 @@ public class WaitQueueTest
                 {
                     System.err.println("");
                     fail.set(true);
+                    ready.set(true);
+                    return;
                 }
 
-                try
-                {
-                    Thread.sleep(200);
-                    wait.await();
-                } catch (InterruptedException e)
-                {
-                    e.printStackTrace();
-                }
+                ready.set(true);
+                wait.awaitUninterruptibly();
                 if (!condition.get())
                 {
                     System.err.println("Woke up when condition not met");
@@ -147,10 +108,12 @@ public class WaitQueueTest
             }
         });
         t.start();
-        Thread.sleep(50);
+        final ThreadLocalRandom random = ThreadLocalRandom.current();
+        while (!ready.get())
+            random.nextLong();
         condition.set(true);
         queue.signal();
-        t.join(300);
+        Util.joinThread(t);
         assertFalse(queue.getClass().getName(), t.isAlive());
         assertFalse(fail.get());
     }


[07/10] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2

Conflicts:
	test/unit/org/apache/cassandra/Util.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7636a6b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7636a6b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7636a6b8

Branch: refs/heads/trunk
Commit: 7636a6b860cddb614b92721b9886fdbeb23887f9
Parents: 32bc8b0 e1bb792
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Aug 6 14:46:38 2015 +0200
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Aug 6 14:46:38 2015 +0200

----------------------------------------------------------------------
 test/unit/org/apache/cassandra/Util.java        |  6 +-
 .../cassandra/concurrent/WaitQueueTest.java     | 91 ++++++--------------
 2 files changed, 32 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7636a6b8/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/Util.java
index 08cc093,e05468f..da81aaa
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -382,16 -377,8 +382,20 @@@ public class Uti
          return new RangeTombstone(startName, endName, timestamp , localtime);
      }
  
- 
 +    public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds)
 +    {
 +        long now = System.currentTimeMillis();
 +        while (System.currentTimeMillis() - now < now + (1000 * timeoutInSeconds))
 +        {
 +            if (s.get().equals(expected))
 +                break;
 +            Thread.yield();
 +        }
 +        assertEquals(expected, s.get());
 +    }
++
+     public static void joinThread(Thread thread) throws InterruptedException
+     {
+         thread.join(10000);
+     }
  }


[04/10] cassandra git commit: Fix WaitQueueTest flakiness

Posted by be...@apache.org.
Fix WaitQueueTest flakiness


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e1bb7926
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e1bb7926
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e1bb7926

Branch: refs/heads/trunk
Commit: e1bb79260a9bdf478895724ea180cf0c2efb37ff
Parents: 20f12e9
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Aug 6 14:45:54 2015 +0200
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Aug 6 14:45:54 2015 +0200

----------------------------------------------------------------------
 test/unit/org/apache/cassandra/Util.java        |  5 ++
 .../cassandra/concurrent/WaitQueueTest.java     | 91 ++++++--------------
 2 files changed, 32 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1bb7926/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 1015be6..e05468f 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -376,4 +376,9 @@ public class Util
         Composite endName = CellNames.simpleDense(ByteBufferUtil.bytes(finish));
         return new RangeTombstone(startName, endName, timestamp , localtime);
     }
+
+    public static void joinThread(Thread thread) throws InterruptedException
+    {
+        thread.join(10000);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1bb7926/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
index 3e7cb7b..fdc6880 100644
--- a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
@@ -21,10 +21,13 @@ package org.apache.cassandra.concurrent;
  */
 
 
+import org.apache.cassandra.Util;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.junit.*;
 
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.*;
 
@@ -38,6 +41,7 @@ public class WaitQueueTest
     }
     public void testSerial(final WaitQueue queue) throws InterruptedException
     {
+        final AtomicInteger ready = new AtomicInteger();
         Thread[] ts = new Thread[4];
         for (int i = 0 ; i < ts.length ; i++)
             ts[i] = new Thread(new Runnable()
@@ -46,6 +50,7 @@ public class WaitQueueTest
             public void run()
             {
                 WaitQueue.Signal wait = queue.register();
+                ready.incrementAndGet();
                 try
                 {
                     wait.await();
@@ -55,68 +60,28 @@ public class WaitQueueTest
                 }
             }
         });
-        for (int i = 0 ; i < ts.length ; i++)
-            ts[i].start();
-        Thread.sleep(100);
-        queue.signal();
-        queue.signal();
-        queue.signal();
-        queue.signal();
-        for (int i = 0 ; i < ts.length ; i++)
+        for (Thread t : ts)
+            t.start();
+        final ThreadLocalRandom random = ThreadLocalRandom.current();
+        while (ready.get() < ts.length)
+            random.nextLong();
+        for (Thread t : ts)
+            queue.signal();
+        for (Thread t : ts)
         {
-            ts[i].join(100);
-            assertFalse(queue.getClass().getName(), ts[i].isAlive());
+            Util.joinThread(t);
+            assertFalse(queue.getClass().getName(), t.isAlive());
         }
     }
 
-
-    @Test
-    public void testCondition1() throws InterruptedException
-    {
-        testCondition1(new WaitQueue());
-    }
-
-    public void testCondition1(final WaitQueue queue) throws InterruptedException
-    {
-        final AtomicBoolean cond1 = new AtomicBoolean(false);
-        final AtomicBoolean fail = new AtomicBoolean(false);
-        Thread t1 = new Thread(new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                try
-                {
-                    Thread.sleep(200);
-                } catch (InterruptedException e)
-                {
-                    e.printStackTrace();
-                }
-                WaitQueue.Signal wait = queue.register();
-                if (!cond1.get())
-                {
-                    System.err.println("Condition should have already been met");
-                    fail.set(true);
-                }
-            }
-        });
-        t1.start();
-        Thread.sleep(50);
-        cond1.set(true);
-        Thread.sleep(300);
-        queue.signal();
-        t1.join(300);
-        assertFalse(queue.getClass().getName(), t1.isAlive());
-        assertFalse(fail.get());
-    }
-
     @Test
-    public void testCondition2() throws InterruptedException
+    public void testCondition() throws InterruptedException
     {
-        testCondition2(new WaitQueue());
+        testCondition(new WaitQueue());
     }
-    public void testCondition2(final WaitQueue queue) throws InterruptedException
+    public void testCondition(final WaitQueue queue) throws InterruptedException
     {
+        final AtomicBoolean ready = new AtomicBoolean(false);
         final AtomicBoolean condition = new AtomicBoolean(false);
         final AtomicBoolean fail = new AtomicBoolean(false);
         Thread t = new Thread(new Runnable()
@@ -129,16 +94,12 @@ public class WaitQueueTest
                 {
                     System.err.println("");
                     fail.set(true);
+                    ready.set(true);
+                    return;
                 }
 
-                try
-                {
-                    Thread.sleep(200);
-                    wait.await();
-                } catch (InterruptedException e)
-                {
-                    e.printStackTrace();
-                }
+                ready.set(true);
+                wait.awaitUninterruptibly();
                 if (!condition.get())
                 {
                     System.err.println("Woke up when condition not met");
@@ -147,10 +108,12 @@ public class WaitQueueTest
             }
         });
         t.start();
-        Thread.sleep(50);
+        final ThreadLocalRandom random = ThreadLocalRandom.current();
+        while (!ready.get())
+            random.nextLong();
         condition.set(true);
         queue.signal();
-        t.join(300);
+        Util.joinThread(t);
         assertFalse(queue.getClass().getName(), t.isAlive());
         assertFalse(fail.get());
     }


[05/10] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2

Conflicts:
	test/unit/org/apache/cassandra/Util.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7636a6b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7636a6b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7636a6b8

Branch: refs/heads/cassandra-3.0
Commit: 7636a6b860cddb614b92721b9886fdbeb23887f9
Parents: 32bc8b0 e1bb792
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Aug 6 14:46:38 2015 +0200
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Aug 6 14:46:38 2015 +0200

----------------------------------------------------------------------
 test/unit/org/apache/cassandra/Util.java        |  6 +-
 .../cassandra/concurrent/WaitQueueTest.java     | 91 ++++++--------------
 2 files changed, 32 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7636a6b8/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/Util.java
index 08cc093,e05468f..da81aaa
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -382,16 -377,8 +382,20 @@@ public class Uti
          return new RangeTombstone(startName, endName, timestamp , localtime);
      }
  
- 
 +    public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds)
 +    {
 +        long now = System.currentTimeMillis();
 +        while (System.currentTimeMillis() - now < now + (1000 * timeoutInSeconds))
 +        {
 +            if (s.get().equals(expected))
 +                break;
 +            Thread.yield();
 +        }
 +        assertEquals(expected, s.get());
 +    }
++
+     public static void joinThread(Thread thread) throws InterruptedException
+     {
+         thread.join(10000);
+     }
  }


[08/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by be...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0

Conflicts:
	test/unit/org/apache/cassandra/Util.java
	test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf474086
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf474086
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf474086

Branch: refs/heads/cassandra-3.0
Commit: bf4740867a4a9d54d06c8c91802e02793b2bde2f
Parents: ace28c9 7636a6b
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Aug 6 14:51:27 2015 +0200
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Aug 6 14:51:27 2015 +0200

----------------------------------------------------------------------
 test/unit/org/apache/cassandra/Util.java        | 24 ++++++++++----------
 .../cassandra/concurrent/WaitQueueTest.java     |  2 +-
 2 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf474086/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/Util.java
index 7efe6f4,da81aaa..358168f
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -250,283 -333,67 +250,283 @@@ public class Uti
          assert thrown : exception.getName() + " not received";
      }
  
 -    public static QueryFilter namesQueryFilter(ColumnFamilyStore cfs, DecoratedKey key)
 +    public static AbstractReadCommandBuilder.SinglePartitionBuilder cmd(ColumnFamilyStore cfs, Object... partitionKey)
      {
 -        SortedSet<CellName> s = new TreeSet<CellName>(cfs.getComparator());
 -        return QueryFilter.getNamesFilter(key, cfs.name, s, System.currentTimeMillis());
 +        return new AbstractReadCommandBuilder.SinglePartitionBuilder(cfs, makeKey(cfs.metadata, partitionKey));
      }
  
 -    public static QueryFilter namesQueryFilter(ColumnFamilyStore cfs, DecoratedKey key, String... names)
 +    public static AbstractReadCommandBuilder.PartitionRangeBuilder cmd(ColumnFamilyStore cfs)
      {
 -        SortedSet<CellName> s = new TreeSet<CellName>(cfs.getComparator());
 -        for (String str : names)
 -            s.add(cellname(str));
 -        return QueryFilter.getNamesFilter(key, cfs.name, s, System.currentTimeMillis());
 +        return new AbstractReadCommandBuilder.PartitionRangeBuilder(cfs);
      }
  
 -    public static QueryFilter namesQueryFilter(ColumnFamilyStore cfs, DecoratedKey key, CellName... names)
 +    static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)
      {
 -        SortedSet<CellName> s = new TreeSet<CellName>(cfs.getComparator());
 -        for (CellName n : names)
 -            s.add(n);
 -        return QueryFilter.getNamesFilter(key, cfs.name, s, System.currentTimeMillis());
 +        if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey)
 +            return (DecoratedKey)partitionKey[0];
 +
 +        ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
 +        return metadata.decorateKey(key);
      }
  
 -    public static NamesQueryFilter namesFilter(ColumnFamilyStore cfs, String... names)
 +    public static void assertEmptyUnfiltered(ReadCommand command)
      {
 -        SortedSet<CellName> s = new TreeSet<CellName>(cfs.getComparator());
 -        for (String str : names)
 -            s.add(cellname(str));
 -        return new NamesQueryFilter(s);
 +        try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup))
 +        {
 +            if (iterator.hasNext())
 +            {
 +                try (UnfilteredRowIterator partition = iterator.next())
 +                {
 +                    throw new AssertionError("Expected no results for query " + command.toCQLString() + " but got key " + command.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
 +                }
 +            }
 +        }
      }
  
 -    public static String string(ByteBuffer bb)
 +    public static void assertEmpty(ReadCommand command)
      {
 -        try
 +        try (ReadOrderGroup orderGroup = command.startOrderGroup(); PartitionIterator iterator = command.executeInternal(orderGroup))
 +        {
 +            if (iterator.hasNext())
 +            {
 +                try (RowIterator partition = iterator.next())
 +                {
 +                    throw new AssertionError("Expected no results for query " + command.toCQLString() + " but got key " + command.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
 +                }
 +            }
 +        }
 +    }
 +
 +    public static List<ArrayBackedPartition> getAllUnfiltered(ReadCommand command)
 +    {
 +        List<ArrayBackedPartition> results = new ArrayList<>();
 +        try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup))
 +        {
 +            while (iterator.hasNext())
 +            {
 +                try (UnfilteredRowIterator partition = iterator.next())
 +                {
 +                    results.add(ArrayBackedPartition.create(partition));
 +                }
 +            }
 +        }
 +        return results;
 +    }
 +
 +    public static List<FilteredPartition> getAll(ReadCommand command)
 +    {
 +        List<FilteredPartition> results = new ArrayList<>();
 +        try (ReadOrderGroup orderGroup = command.startOrderGroup(); PartitionIterator iterator = command.executeInternal(orderGroup))
 +        {
 +            while (iterator.hasNext())
 +            {
 +                try (RowIterator partition = iterator.next())
 +                {
 +                    results.add(FilteredPartition.create(partition));
 +                }
 +            }
 +        }
 +        return results;
 +    }
 +
 +    public static Row getOnlyRowUnfiltered(ReadCommand cmd)
 +    {
 +        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup))
 +        {
 +            assert iterator.hasNext() : "Expecting one row in one partition but got nothing";
 +            try (UnfilteredRowIterator partition = iterator.next())
 +            {
 +                assert !iterator.hasNext() : "Expecting a single partition but got more";
 +
 +                assert partition.hasNext() : "Expecting one row in one partition but got an empty partition";
 +                Row row = ((Row)partition.next());
 +                assert !partition.hasNext() : "Expecting a single row but got more";
 +                return row;
 +            }
 +        }
 +    }
 +
 +    public static Row getOnlyRow(ReadCommand cmd)
 +    {
 +        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator iterator = cmd.executeInternal(orderGroup))
 +        {
 +            assert iterator.hasNext() : "Expecting one row in one partition but got nothing";
 +            try (RowIterator partition = iterator.next())
 +            {
 +                assert !iterator.hasNext() : "Expecting a single partition but got more";
 +                assert partition.hasNext() : "Expecting one row in one partition but got an empty partition";
 +                Row row = partition.next();
 +                assert !partition.hasNext() : "Expecting a single row but got more";
 +                return row;
 +            }
 +        }
 +    }
 +
 +    public static ArrayBackedPartition getOnlyPartitionUnfiltered(ReadCommand cmd)
 +    {
 +        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup))
 +        {
 +            assert iterator.hasNext() : "Expecting a single partition but got nothing";
 +            try (UnfilteredRowIterator partition = iterator.next())
 +            {
 +                assert !iterator.hasNext() : "Expecting a single partition but got more";
 +                return ArrayBackedPartition.create(partition);
 +            }
 +        }
 +    }
 +
 +    public static FilteredPartition getOnlyPartition(ReadCommand cmd)
 +    {
 +        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator iterator = cmd.executeInternal(orderGroup))
 +        {
 +            assert iterator.hasNext() : "Expecting a single partition but got nothing";
 +            try (RowIterator partition = iterator.next())
 +            {
 +                assert !iterator.hasNext() : "Expecting a single partition but got more";
 +                return FilteredPartition.create(partition);
 +            }
 +        }
 +    }
 +
 +    public static UnfilteredRowIterator apply(Mutation mutation)
 +    {
 +        mutation.apply();
 +        assert mutation.getPartitionUpdates().size() == 1;
 +        return mutation.getPartitionUpdates().iterator().next().unfilteredIterator();
 +    }
 +
 +    public static Cell cell(ColumnFamilyStore cfs, Row row, String columnName)
 +    {
 +        ColumnDefinition def = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes(columnName));
 +        assert def != null;
 +        return row.getCell(def);
 +    }
 +
 +    public static Row row(Partition partition, Object... clustering)
 +    {
 +        return partition.getRow(partition.metadata().comparator.make(clustering));
 +    }
 +
 +    public static void assertCellValue(Object value, ColumnFamilyStore cfs, Row row, String columnName)
 +    {
 +        Cell cell = cell(cfs, row, columnName);
 +        assert cell != null : "Row " + row.toString(cfs.metadata) + " has no cell for " + columnName;
 +        assertEquals(value, cell.column().type.compose(cell.value()));
 +    }
 +
 +    public static void consume(UnfilteredRowIterator iter)
 +    {
 +        try (UnfilteredRowIterator iterator = iter)
          {
 -            return ByteBufferUtil.string(bb);
 +            while (iter.hasNext())
 +                iter.next();
          }
 -        catch (Exception e)
 +    }
 +
 +    public static int size(PartitionIterator iter)
 +    {
 +        int size = 0;
 +        while (iter.hasNext())
          {
 -            throw new RuntimeException(e);
 +            ++size;
 +            iter.next().close();
          }
 +        return size;
      }
  
 -    public static RangeTombstone tombstone(String start, String finish, long timestamp, int localtime)
 +    public static CBuilder getCBuilderForCFM(CFMetaData cfm)
      {
 -        Composite startName = CellNames.simpleDense(ByteBufferUtil.bytes(start));
 -        Composite endName = CellNames.simpleDense(ByteBufferUtil.bytes(finish));
 -        return new RangeTombstone(startName, endName, timestamp , localtime);
 +        List<ColumnDefinition> clusteringColumns = cfm.clusteringColumns();
 +        List<AbstractType<?>> types = new ArrayList<>(clusteringColumns.size());
 +        for (ColumnDefinition def : clusteringColumns)
 +            types.add(def.type);
 +        return CBuilder.create(new ClusteringComparator(types));
 +    }
 +
 +    public static boolean equal(UnfilteredRowIterator a, UnfilteredRowIterator b)
 +    {
 +        return Objects.equals(a.columns(), b.columns())
 +            && Objects.equals(a.metadata(), b.metadata())
 +            && Objects.equals(a.isReverseOrder(), b.isReverseOrder())
 +            && Objects.equals(a.partitionKey(), b.partitionKey())
 +            && Objects.equals(a.partitionLevelDeletion(), b.partitionLevelDeletion())
 +            && Objects.equals(a.staticRow(), b.staticRow())
 +            && Objects.equals(a.stats(), b.stats())
 +            && Iterators.elementsEqual(a, b);
 +    }
 +
 +    // moved & refactored from KeyspaceTest in < 3.0
 +    public static void assertColumns(Row row, String... expectedColumnNames)
 +    {
 +        Iterator<Cell> cells = row == null ? Iterators.<Cell>emptyIterator() : row.cells().iterator();
 +        String[] actual = Iterators.toArray(Iterators.transform(cells, new Function<Cell, String>()
 +        {
 +            public String apply(Cell cell)
 +            {
 +                return cell.column().name.toString();
 +            }
 +        }), String.class);
 +
 +        assert Arrays.equals(actual, expectedColumnNames)
 +        : String.format("Columns [%s])] is not expected [%s]",
 +                        ((row == null) ? "" : row.columns().toString()),
 +                        StringUtils.join(expectedColumnNames, ","));
 +    }
 +
 +    public static void assertColumn(CFMetaData cfm, Row row, String name, String value, long timestamp)
 +    {
 +        Cell cell = row.getCell(cfm.getColumnDefinition(new ColumnIdentifier(name, true)));
 +        assertColumn(cell, value, timestamp);
 +    }
 +
 +    public static void assertColumn(Cell cell, String value, long timestamp)
 +    {
 +        assertNotNull(cell);
 +        assertEquals(0, ByteBufferUtil.compareUnsigned(cell.value(), ByteBufferUtil.bytes(value)));
 +        assertEquals(timestamp, cell.timestamp());
 +    }
 +
 +    public static void assertClustering(CFMetaData cfm, Row row, Object... clusteringValue)
 +    {
 +        assertEquals(row.clustering().size(), clusteringValue.length);
 +        assertEquals(0, cfm.comparator.compare(row.clustering(), cfm.comparator.make(clusteringValue)));
 +    }
 +
-     public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds)
-     {
-         long now = System.currentTimeMillis();
-         while (System.currentTimeMillis() - now < now + (1000 * timeoutInSeconds))
-         {
-             if (s.get().equals(expected))
-                 break;
-             Thread.yield();
-         }
-         assertEquals(expected, s.get());
-     }
- 
 +    public static PartitionerSwitcher switchPartitioner(IPartitioner p)
 +    {
 +        return new PartitionerSwitcher(p);
 +    }
 +
 +    public static class PartitionerSwitcher implements AutoCloseable
 +    {
 +        final IPartitioner oldP;
 +        final IPartitioner newP;
 +
 +        public PartitionerSwitcher(IPartitioner partitioner)
 +        {
 +            newP = partitioner;
 +            oldP = StorageService.instance.setPartitionerUnsafe(partitioner);
 +        }
 +
 +        public void close()
 +        {
 +            IPartitioner p = StorageService.instance.setPartitionerUnsafe(oldP);
 +            assert p == newP;
 +        }
      }
  
+     public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds)
+     {
+         long now = System.currentTimeMillis();
+         while (System.currentTimeMillis() - now < now + (1000 * timeoutInSeconds))
+         {
+             if (s.get().equals(expected))
+                 break;
+             Thread.yield();
+         }
+         assertEquals(expected, s.get());
+     }
+ 
      public static void joinThread(Thread thread) throws InterruptedException
      {
          thread.join(10000);


[10/10] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fe8fbc8f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fe8fbc8f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fe8fbc8f

Branch: refs/heads/trunk
Commit: fe8fbc8f1d2a91d21ef5643577e375334bd20f62
Parents: 9cd4f82 bf47408
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Aug 6 14:51:39 2015 +0200
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Aug 6 14:51:39 2015 +0200

----------------------------------------------------------------------
 test/unit/org/apache/cassandra/Util.java        | 24 ++++++++++----------
 .../cassandra/concurrent/WaitQueueTest.java     |  2 +-
 2 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------



[02/10] cassandra git commit: Fix WaitQueueTest flakiness

Posted by be...@apache.org.
Fix WaitQueueTest flakiness


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e1bb7926
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e1bb7926
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e1bb7926

Branch: refs/heads/cassandra-2.2
Commit: e1bb79260a9bdf478895724ea180cf0c2efb37ff
Parents: 20f12e9
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Aug 6 14:45:54 2015 +0200
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Aug 6 14:45:54 2015 +0200

----------------------------------------------------------------------
 test/unit/org/apache/cassandra/Util.java        |  5 ++
 .../cassandra/concurrent/WaitQueueTest.java     | 91 ++++++--------------
 2 files changed, 32 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1bb7926/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 1015be6..e05468f 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -376,4 +376,9 @@ public class Util
         Composite endName = CellNames.simpleDense(ByteBufferUtil.bytes(finish));
         return new RangeTombstone(startName, endName, timestamp , localtime);
     }
+
+    public static void joinThread(Thread thread) throws InterruptedException
+    {
+        thread.join(10000);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1bb7926/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
index 3e7cb7b..fdc6880 100644
--- a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
@@ -21,10 +21,13 @@ package org.apache.cassandra.concurrent;
  */
 
 
+import org.apache.cassandra.Util;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.junit.*;
 
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.*;
 
@@ -38,6 +41,7 @@ public class WaitQueueTest
     }
     public void testSerial(final WaitQueue queue) throws InterruptedException
     {
+        final AtomicInteger ready = new AtomicInteger();
         Thread[] ts = new Thread[4];
         for (int i = 0 ; i < ts.length ; i++)
             ts[i] = new Thread(new Runnable()
@@ -46,6 +50,7 @@ public class WaitQueueTest
             public void run()
             {
                 WaitQueue.Signal wait = queue.register();
+                ready.incrementAndGet();
                 try
                 {
                     wait.await();
@@ -55,68 +60,28 @@ public class WaitQueueTest
                 }
             }
         });
-        for (int i = 0 ; i < ts.length ; i++)
-            ts[i].start();
-        Thread.sleep(100);
-        queue.signal();
-        queue.signal();
-        queue.signal();
-        queue.signal();
-        for (int i = 0 ; i < ts.length ; i++)
+        for (Thread t : ts)
+            t.start();
+        final ThreadLocalRandom random = ThreadLocalRandom.current();
+        while (ready.get() < ts.length)
+            random.nextLong();
+        for (Thread t : ts)
+            queue.signal();
+        for (Thread t : ts)
         {
-            ts[i].join(100);
-            assertFalse(queue.getClass().getName(), ts[i].isAlive());
+            Util.joinThread(t);
+            assertFalse(queue.getClass().getName(), t.isAlive());
         }
     }
 
-
-    @Test
-    public void testCondition1() throws InterruptedException
-    {
-        testCondition1(new WaitQueue());
-    }
-
-    public void testCondition1(final WaitQueue queue) throws InterruptedException
-    {
-        final AtomicBoolean cond1 = new AtomicBoolean(false);
-        final AtomicBoolean fail = new AtomicBoolean(false);
-        Thread t1 = new Thread(new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                try
-                {
-                    Thread.sleep(200);
-                } catch (InterruptedException e)
-                {
-                    e.printStackTrace();
-                }
-                WaitQueue.Signal wait = queue.register();
-                if (!cond1.get())
-                {
-                    System.err.println("Condition should have already been met");
-                    fail.set(true);
-                }
-            }
-        });
-        t1.start();
-        Thread.sleep(50);
-        cond1.set(true);
-        Thread.sleep(300);
-        queue.signal();
-        t1.join(300);
-        assertFalse(queue.getClass().getName(), t1.isAlive());
-        assertFalse(fail.get());
-    }
-
     @Test
-    public void testCondition2() throws InterruptedException
+    public void testCondition() throws InterruptedException
     {
-        testCondition2(new WaitQueue());
+        testCondition(new WaitQueue());
     }
-    public void testCondition2(final WaitQueue queue) throws InterruptedException
+    public void testCondition(final WaitQueue queue) throws InterruptedException
     {
+        final AtomicBoolean ready = new AtomicBoolean(false);
         final AtomicBoolean condition = new AtomicBoolean(false);
         final AtomicBoolean fail = new AtomicBoolean(false);
         Thread t = new Thread(new Runnable()
@@ -129,16 +94,12 @@ public class WaitQueueTest
                 {
                     System.err.println("");
                     fail.set(true);
+                    ready.set(true);
+                    return;
                 }
 
-                try
-                {
-                    Thread.sleep(200);
-                    wait.await();
-                } catch (InterruptedException e)
-                {
-                    e.printStackTrace();
-                }
+                ready.set(true);
+                wait.awaitUninterruptibly();
                 if (!condition.get())
                 {
                     System.err.println("Woke up when condition not met");
@@ -147,10 +108,12 @@ public class WaitQueueTest
             }
         });
         t.start();
-        Thread.sleep(50);
+        final ThreadLocalRandom random = ThreadLocalRandom.current();
+        while (!ready.get())
+            random.nextLong();
         condition.set(true);
         queue.signal();
-        t.join(300);
+        Util.joinThread(t);
         assertFalse(queue.getClass().getName(), t.isAlive());
         assertFalse(fail.get());
     }


[06/10] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2

Conflicts:
	test/unit/org/apache/cassandra/Util.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7636a6b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7636a6b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7636a6b8

Branch: refs/heads/cassandra-2.2
Commit: 7636a6b860cddb614b92721b9886fdbeb23887f9
Parents: 32bc8b0 e1bb792
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Aug 6 14:46:38 2015 +0200
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Aug 6 14:46:38 2015 +0200

----------------------------------------------------------------------
 test/unit/org/apache/cassandra/Util.java        |  6 +-
 .../cassandra/concurrent/WaitQueueTest.java     | 91 ++++++--------------
 2 files changed, 32 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7636a6b8/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/Util.java
index 08cc093,e05468f..da81aaa
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -382,16 -377,8 +382,20 @@@ public class Uti
          return new RangeTombstone(startName, endName, timestamp , localtime);
      }
  
- 
 +    public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds)
 +    {
 +        long now = System.currentTimeMillis();
 +        while (System.currentTimeMillis() - now < now + (1000 * timeoutInSeconds))
 +        {
 +            if (s.get().equals(expected))
 +                break;
 +            Thread.yield();
 +        }
 +        assertEquals(expected, s.get());
 +    }
++
+     public static void joinThread(Thread thread) throws InterruptedException
+     {
+         thread.join(10000);
+     }
  }


[03/10] cassandra git commit: Fix WaitQueueTest flakiness

Posted by be...@apache.org.
Fix WaitQueueTest flakiness


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e1bb7926
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e1bb7926
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e1bb7926

Branch: refs/heads/cassandra-3.0
Commit: e1bb79260a9bdf478895724ea180cf0c2efb37ff
Parents: 20f12e9
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Aug 6 14:45:54 2015 +0200
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Aug 6 14:45:54 2015 +0200

----------------------------------------------------------------------
 test/unit/org/apache/cassandra/Util.java        |  5 ++
 .../cassandra/concurrent/WaitQueueTest.java     | 91 ++++++--------------
 2 files changed, 32 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1bb7926/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 1015be6..e05468f 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -376,4 +376,9 @@ public class Util
         Composite endName = CellNames.simpleDense(ByteBufferUtil.bytes(finish));
         return new RangeTombstone(startName, endName, timestamp , localtime);
     }
+
+    public static void joinThread(Thread thread) throws InterruptedException
+    {
+        thread.join(10000);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1bb7926/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
index 3e7cb7b..fdc6880 100644
--- a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
@@ -21,10 +21,13 @@ package org.apache.cassandra.concurrent;
  */
 
 
+import org.apache.cassandra.Util;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.junit.*;
 
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.*;
 
@@ -38,6 +41,7 @@ public class WaitQueueTest
     }
     public void testSerial(final WaitQueue queue) throws InterruptedException
     {
+        final AtomicInteger ready = new AtomicInteger();
         Thread[] ts = new Thread[4];
         for (int i = 0 ; i < ts.length ; i++)
             ts[i] = new Thread(new Runnable()
@@ -46,6 +50,7 @@ public class WaitQueueTest
             public void run()
             {
                 WaitQueue.Signal wait = queue.register();
+                ready.incrementAndGet();
                 try
                 {
                     wait.await();
@@ -55,68 +60,28 @@ public class WaitQueueTest
                 }
             }
         });
-        for (int i = 0 ; i < ts.length ; i++)
-            ts[i].start();
-        Thread.sleep(100);
-        queue.signal();
-        queue.signal();
-        queue.signal();
-        queue.signal();
-        for (int i = 0 ; i < ts.length ; i++)
+        for (Thread t : ts)
+            t.start();
+        final ThreadLocalRandom random = ThreadLocalRandom.current();
+        while (ready.get() < ts.length)
+            random.nextLong();
+        for (Thread t : ts)
+            queue.signal();
+        for (Thread t : ts)
         {
-            ts[i].join(100);
-            assertFalse(queue.getClass().getName(), ts[i].isAlive());
+            Util.joinThread(t);
+            assertFalse(queue.getClass().getName(), t.isAlive());
         }
     }
 
-
-    @Test
-    public void testCondition1() throws InterruptedException
-    {
-        testCondition1(new WaitQueue());
-    }
-
-    public void testCondition1(final WaitQueue queue) throws InterruptedException
-    {
-        final AtomicBoolean cond1 = new AtomicBoolean(false);
-        final AtomicBoolean fail = new AtomicBoolean(false);
-        Thread t1 = new Thread(new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                try
-                {
-                    Thread.sleep(200);
-                } catch (InterruptedException e)
-                {
-                    e.printStackTrace();
-                }
-                WaitQueue.Signal wait = queue.register();
-                if (!cond1.get())
-                {
-                    System.err.println("Condition should have already been met");
-                    fail.set(true);
-                }
-            }
-        });
-        t1.start();
-        Thread.sleep(50);
-        cond1.set(true);
-        Thread.sleep(300);
-        queue.signal();
-        t1.join(300);
-        assertFalse(queue.getClass().getName(), t1.isAlive());
-        assertFalse(fail.get());
-    }
-
     @Test
-    public void testCondition2() throws InterruptedException
+    public void testCondition() throws InterruptedException
     {
-        testCondition2(new WaitQueue());
+        testCondition(new WaitQueue());
     }
-    public void testCondition2(final WaitQueue queue) throws InterruptedException
+    public void testCondition(final WaitQueue queue) throws InterruptedException
     {
+        final AtomicBoolean ready = new AtomicBoolean(false);
         final AtomicBoolean condition = new AtomicBoolean(false);
         final AtomicBoolean fail = new AtomicBoolean(false);
         Thread t = new Thread(new Runnable()
@@ -129,16 +94,12 @@ public class WaitQueueTest
                 {
                     System.err.println("");
                     fail.set(true);
+                    ready.set(true);
+                    return;
                 }
 
-                try
-                {
-                    Thread.sleep(200);
-                    wait.await();
-                } catch (InterruptedException e)
-                {
-                    e.printStackTrace();
-                }
+                ready.set(true);
+                wait.awaitUninterruptibly();
                 if (!condition.get())
                 {
                     System.err.println("Woke up when condition not met");
@@ -147,10 +108,12 @@ public class WaitQueueTest
             }
         });
         t.start();
-        Thread.sleep(50);
+        final ThreadLocalRandom random = ThreadLocalRandom.current();
+        while (!ready.get())
+            random.nextLong();
         condition.set(true);
         queue.signal();
-        t.join(300);
+        Util.joinThread(t);
         assertFalse(queue.getClass().getName(), t.isAlive());
         assertFalse(fail.get());
     }


[09/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by be...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0

Conflicts:
	test/unit/org/apache/cassandra/Util.java
	test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf474086
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf474086
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf474086

Branch: refs/heads/trunk
Commit: bf4740867a4a9d54d06c8c91802e02793b2bde2f
Parents: ace28c9 7636a6b
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Aug 6 14:51:27 2015 +0200
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Aug 6 14:51:27 2015 +0200

----------------------------------------------------------------------
 test/unit/org/apache/cassandra/Util.java        | 24 ++++++++++----------
 .../cassandra/concurrent/WaitQueueTest.java     |  2 +-
 2 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf474086/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/Util.java
index 7efe6f4,da81aaa..358168f
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -250,283 -333,67 +250,283 @@@ public class Uti
          assert thrown : exception.getName() + " not received";
      }
  
 -    public static QueryFilter namesQueryFilter(ColumnFamilyStore cfs, DecoratedKey key)
 +    public static AbstractReadCommandBuilder.SinglePartitionBuilder cmd(ColumnFamilyStore cfs, Object... partitionKey)
      {
 -        SortedSet<CellName> s = new TreeSet<CellName>(cfs.getComparator());
 -        return QueryFilter.getNamesFilter(key, cfs.name, s, System.currentTimeMillis());
 +        return new AbstractReadCommandBuilder.SinglePartitionBuilder(cfs, makeKey(cfs.metadata, partitionKey));
      }
  
 -    public static QueryFilter namesQueryFilter(ColumnFamilyStore cfs, DecoratedKey key, String... names)
 +    public static AbstractReadCommandBuilder.PartitionRangeBuilder cmd(ColumnFamilyStore cfs)
      {
 -        SortedSet<CellName> s = new TreeSet<CellName>(cfs.getComparator());
 -        for (String str : names)
 -            s.add(cellname(str));
 -        return QueryFilter.getNamesFilter(key, cfs.name, s, System.currentTimeMillis());
 +        return new AbstractReadCommandBuilder.PartitionRangeBuilder(cfs);
      }
  
 -    public static QueryFilter namesQueryFilter(ColumnFamilyStore cfs, DecoratedKey key, CellName... names)
 +    static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)
      {
 -        SortedSet<CellName> s = new TreeSet<CellName>(cfs.getComparator());
 -        for (CellName n : names)
 -            s.add(n);
 -        return QueryFilter.getNamesFilter(key, cfs.name, s, System.currentTimeMillis());
 +        if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey)
 +            return (DecoratedKey)partitionKey[0];
 +
 +        ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
 +        return metadata.decorateKey(key);
      }
  
 -    public static NamesQueryFilter namesFilter(ColumnFamilyStore cfs, String... names)
 +    public static void assertEmptyUnfiltered(ReadCommand command)
      {
 -        SortedSet<CellName> s = new TreeSet<CellName>(cfs.getComparator());
 -        for (String str : names)
 -            s.add(cellname(str));
 -        return new NamesQueryFilter(s);
 +        try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup))
 +        {
 +            if (iterator.hasNext())
 +            {
 +                try (UnfilteredRowIterator partition = iterator.next())
 +                {
 +                    throw new AssertionError("Expected no results for query " + command.toCQLString() + " but got key " + command.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
 +                }
 +            }
 +        }
      }
  
 -    public static String string(ByteBuffer bb)
 +    public static void assertEmpty(ReadCommand command)
      {
 -        try
 +        try (ReadOrderGroup orderGroup = command.startOrderGroup(); PartitionIterator iterator = command.executeInternal(orderGroup))
 +        {
 +            if (iterator.hasNext())
 +            {
 +                try (RowIterator partition = iterator.next())
 +                {
 +                    throw new AssertionError("Expected no results for query " + command.toCQLString() + " but got key " + command.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
 +                }
 +            }
 +        }
 +    }
 +
 +    public static List<ArrayBackedPartition> getAllUnfiltered(ReadCommand command)
 +    {
 +        List<ArrayBackedPartition> results = new ArrayList<>();
 +        try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup))
 +        {
 +            while (iterator.hasNext())
 +            {
 +                try (UnfilteredRowIterator partition = iterator.next())
 +                {
 +                    results.add(ArrayBackedPartition.create(partition));
 +                }
 +            }
 +        }
 +        return results;
 +    }
 +
 +    public static List<FilteredPartition> getAll(ReadCommand command)
 +    {
 +        List<FilteredPartition> results = new ArrayList<>();
 +        try (ReadOrderGroup orderGroup = command.startOrderGroup(); PartitionIterator iterator = command.executeInternal(orderGroup))
 +        {
 +            while (iterator.hasNext())
 +            {
 +                try (RowIterator partition = iterator.next())
 +                {
 +                    results.add(FilteredPartition.create(partition));
 +                }
 +            }
 +        }
 +        return results;
 +    }
 +
 +    public static Row getOnlyRowUnfiltered(ReadCommand cmd)
 +    {
 +        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup))
 +        {
 +            assert iterator.hasNext() : "Expecting one row in one partition but got nothing";
 +            try (UnfilteredRowIterator partition = iterator.next())
 +            {
 +                assert !iterator.hasNext() : "Expecting a single partition but got more";
 +
 +                assert partition.hasNext() : "Expecting one row in one partition but got an empty partition";
 +                Row row = ((Row)partition.next());
 +                assert !partition.hasNext() : "Expecting a single row but got more";
 +                return row;
 +            }
 +        }
 +    }
 +
 +    public static Row getOnlyRow(ReadCommand cmd)
 +    {
 +        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator iterator = cmd.executeInternal(orderGroup))
 +        {
 +            assert iterator.hasNext() : "Expecting one row in one partition but got nothing";
 +            try (RowIterator partition = iterator.next())
 +            {
 +                assert !iterator.hasNext() : "Expecting a single partition but got more";
 +                assert partition.hasNext() : "Expecting one row in one partition but got an empty partition";
 +                Row row = partition.next();
 +                assert !partition.hasNext() : "Expecting a single row but got more";
 +                return row;
 +            }
 +        }
 +    }
 +
 +    public static ArrayBackedPartition getOnlyPartitionUnfiltered(ReadCommand cmd)
 +    {
 +        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup))
 +        {
 +            assert iterator.hasNext() : "Expecting a single partition but got nothing";
 +            try (UnfilteredRowIterator partition = iterator.next())
 +            {
 +                assert !iterator.hasNext() : "Expecting a single partition but got more";
 +                return ArrayBackedPartition.create(partition);
 +            }
 +        }
 +    }
 +
 +    public static FilteredPartition getOnlyPartition(ReadCommand cmd)
 +    {
 +        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator iterator = cmd.executeInternal(orderGroup))
 +        {
 +            assert iterator.hasNext() : "Expecting a single partition but got nothing";
 +            try (RowIterator partition = iterator.next())
 +            {
 +                assert !iterator.hasNext() : "Expecting a single partition but got more";
 +                return FilteredPartition.create(partition);
 +            }
 +        }
 +    }
 +
 +    public static UnfilteredRowIterator apply(Mutation mutation)
 +    {
 +        mutation.apply();
 +        assert mutation.getPartitionUpdates().size() == 1;
 +        return mutation.getPartitionUpdates().iterator().next().unfilteredIterator();
 +    }
 +
 +    public static Cell cell(ColumnFamilyStore cfs, Row row, String columnName)
 +    {
 +        ColumnDefinition def = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes(columnName));
 +        assert def != null;
 +        return row.getCell(def);
 +    }
 +
 +    public static Row row(Partition partition, Object... clustering)
 +    {
 +        return partition.getRow(partition.metadata().comparator.make(clustering));
 +    }
 +
 +    public static void assertCellValue(Object value, ColumnFamilyStore cfs, Row row, String columnName)
 +    {
 +        Cell cell = cell(cfs, row, columnName);
 +        assert cell != null : "Row " + row.toString(cfs.metadata) + " has no cell for " + columnName;
 +        assertEquals(value, cell.column().type.compose(cell.value()));
 +    }
 +
 +    public static void consume(UnfilteredRowIterator iter)
 +    {
 +        try (UnfilteredRowIterator iterator = iter)
          {
 -            return ByteBufferUtil.string(bb);
 +            while (iter.hasNext())
 +                iter.next();
          }
 -        catch (Exception e)
 +    }
 +
 +    public static int size(PartitionIterator iter)
 +    {
 +        int size = 0;
 +        while (iter.hasNext())
          {
 -            throw new RuntimeException(e);
 +            ++size;
 +            iter.next().close();
          }
 +        return size;
      }
  
 -    public static RangeTombstone tombstone(String start, String finish, long timestamp, int localtime)
 +    public static CBuilder getCBuilderForCFM(CFMetaData cfm)
      {
 -        Composite startName = CellNames.simpleDense(ByteBufferUtil.bytes(start));
 -        Composite endName = CellNames.simpleDense(ByteBufferUtil.bytes(finish));
 -        return new RangeTombstone(startName, endName, timestamp , localtime);
 +        List<ColumnDefinition> clusteringColumns = cfm.clusteringColumns();
 +        List<AbstractType<?>> types = new ArrayList<>(clusteringColumns.size());
 +        for (ColumnDefinition def : clusteringColumns)
 +            types.add(def.type);
 +        return CBuilder.create(new ClusteringComparator(types));
 +    }
 +
 +    public static boolean equal(UnfilteredRowIterator a, UnfilteredRowIterator b)
 +    {
 +        return Objects.equals(a.columns(), b.columns())
 +            && Objects.equals(a.metadata(), b.metadata())
 +            && Objects.equals(a.isReverseOrder(), b.isReverseOrder())
 +            && Objects.equals(a.partitionKey(), b.partitionKey())
 +            && Objects.equals(a.partitionLevelDeletion(), b.partitionLevelDeletion())
 +            && Objects.equals(a.staticRow(), b.staticRow())
 +            && Objects.equals(a.stats(), b.stats())
 +            && Iterators.elementsEqual(a, b);
 +    }
 +
 +    // moved & refactored from KeyspaceTest in < 3.0
 +    public static void assertColumns(Row row, String... expectedColumnNames)
 +    {
 +        Iterator<Cell> cells = row == null ? Iterators.<Cell>emptyIterator() : row.cells().iterator();
 +        String[] actual = Iterators.toArray(Iterators.transform(cells, new Function<Cell, String>()
 +        {
 +            public String apply(Cell cell)
 +            {
 +                return cell.column().name.toString();
 +            }
 +        }), String.class);
 +
 +        assert Arrays.equals(actual, expectedColumnNames)
 +        : String.format("Columns [%s])] is not expected [%s]",
 +                        ((row == null) ? "" : row.columns().toString()),
 +                        StringUtils.join(expectedColumnNames, ","));
 +    }
 +
 +    public static void assertColumn(CFMetaData cfm, Row row, String name, String value, long timestamp)
 +    {
 +        Cell cell = row.getCell(cfm.getColumnDefinition(new ColumnIdentifier(name, true)));
 +        assertColumn(cell, value, timestamp);
 +    }
 +
 +    public static void assertColumn(Cell cell, String value, long timestamp)
 +    {
 +        assertNotNull(cell);
 +        assertEquals(0, ByteBufferUtil.compareUnsigned(cell.value(), ByteBufferUtil.bytes(value)));
 +        assertEquals(timestamp, cell.timestamp());
 +    }
 +
 +    public static void assertClustering(CFMetaData cfm, Row row, Object... clusteringValue)
 +    {
 +        assertEquals(row.clustering().size(), clusteringValue.length);
 +        assertEquals(0, cfm.comparator.compare(row.clustering(), cfm.comparator.make(clusteringValue)));
 +    }
 +
-     public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds)
-     {
-         long now = System.currentTimeMillis();
-         while (System.currentTimeMillis() - now < now + (1000 * timeoutInSeconds))
-         {
-             if (s.get().equals(expected))
-                 break;
-             Thread.yield();
-         }
-         assertEquals(expected, s.get());
-     }
- 
 +    public static PartitionerSwitcher switchPartitioner(IPartitioner p)
 +    {
 +        return new PartitionerSwitcher(p);
 +    }
 +
 +    public static class PartitionerSwitcher implements AutoCloseable
 +    {
 +        final IPartitioner oldP;
 +        final IPartitioner newP;
 +
 +        public PartitionerSwitcher(IPartitioner partitioner)
 +        {
 +            newP = partitioner;
 +            oldP = StorageService.instance.setPartitionerUnsafe(partitioner);
 +        }
 +
 +        public void close()
 +        {
 +            IPartitioner p = StorageService.instance.setPartitionerUnsafe(oldP);
 +            assert p == newP;
 +        }
      }
  
+     public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds)
+     {
+         long now = System.currentTimeMillis();
+         while (System.currentTimeMillis() - now < now + (1000 * timeoutInSeconds))
+         {
+             if (s.get().equals(expected))
+                 break;
+             Thread.yield();
+         }
+         assertEquals(expected, s.get());
+     }
+ 
      public static void joinThread(Thread thread) throws InterruptedException
      {
          thread.join(10000);