You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/26 16:55:39 UTC

git commit: change SimpleCondition.signal to UOE patch by Mikhail Mazursky; reviewed by jbellis for CASSANDRA-5691

Updated Branches:
  refs/heads/trunk 3455f1b76 -> 3ec4ff5ed


change SimpleCondition.signal to UOE
patch by Mikhail Mazursky; reviewed by jbellis for CASSANDRA-5691


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3ec4ff5e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3ec4ff5e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3ec4ff5e

Branch: refs/heads/trunk
Commit: 3ec4ff5ed3f362fc77f25c31ae16afbb46030624
Parents: 3455f1b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jun 26 07:55:28 2013 -0700
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Jun 26 07:55:28 2013 -0700

----------------------------------------------------------------------
 .../compaction/ParallelCompactionIterable.java  | 26 +++++++++++++-------
 .../service/AbstractWriteResponseHandler.java   |  2 +-
 .../apache/cassandra/service/ReadCallback.java  |  2 +-
 .../service/TruncateResponseHandler.java        |  2 +-
 .../apache/cassandra/utils/SimpleCondition.java |  5 ++--
 5 files changed, 22 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 1d380f6..a8baf7a 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -195,7 +195,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
         private class DeserializedColumnIterator implements OnDiskAtomIterator
         {
             private final Row row;
-            private Iterator<Column> iter;
+            private final Iterator<Column> iter;
 
             public DeserializedColumnIterator(Row row)
             {
@@ -236,7 +236,6 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
     {
         private final LinkedBlockingQueue<RowContainer> queue = new LinkedBlockingQueue<RowContainer>(1);
         private static final RowContainer finished = new RowContainer((Row) null);
-        private Condition condition;
         private final ICompactionScanner scanner;
 
         public Deserializer(ICompactionScanner ssts, final int maxInMemorySize)
@@ -246,11 +245,14 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
             {
                 protected void runMayThrow() throws Exception
                 {
+                    SimpleCondition condition = null;
                     while (true)
                     {
                         if (condition != null)
+                        {
                             condition.await();
-
+                            condition = null;
+                        }
                         if (!scanner.hasNext())
                         {
                             queue.put(finished);
@@ -260,13 +262,13 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                         SSTableIdentityIterator iter = (SSTableIdentityIterator) scanner.next();
                         if (iter.dataSize > maxInMemorySize)
                         {
-                            logger.debug("parallel lazy deserialize from " + iter.getPath());
+                            logger.debug("parallel lazy deserialize from {}", iter.getPath());
                             condition = new SimpleCondition();
                             queue.put(new RowContainer(new NotifyingSSTableIdentityIterator(iter, condition)));
                         }
                         else
                         {
-                            logger.debug("parallel eager deserialize from " + iter.getPath());
+                            logger.debug("parallel eager deserialize from {}", iter.getPath());
                             queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory))));
                         }
                     }
@@ -301,9 +303,9 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
     private static class NotifyingSSTableIdentityIterator implements OnDiskAtomIterator
     {
         private final SSTableIdentityIterator wrapped;
-        private final Condition condition;
+        private final SimpleCondition condition;
 
-        public NotifyingSSTableIdentityIterator(SSTableIdentityIterator wrapped, Condition condition)
+        public NotifyingSSTableIdentityIterator(SSTableIdentityIterator wrapped, SimpleCondition condition)
         {
             this.wrapped = wrapped;
             this.condition = condition;
@@ -321,8 +323,14 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
 
         public void close() throws IOException
         {
-            wrapped.close();
-            condition.signal();
+            try
+            {
+                wrapped.close();
+            }
+            finally
+            {
+                condition.signalAll();
+            }
         }
 
         public boolean hasNext()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 7be4c29..1740ee2 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -100,7 +100,7 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
 
     protected void signal()
     {
-        condition.signal();
+        condition.signalAll();
         if (callback != null)
             callback.run();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 7cb5a23..bd8b025 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -117,7 +117,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
               : received.get();
         if (n >= blockfor && resolver.isDataPresent())
         {
-            condition.signal();
+            condition.signalAll();
             maybeResolveForRepair();
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index 3920b91..3bacad8 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -70,7 +70,7 @@ public class TruncateResponseHandler implements IAsyncCallback
     {
         responses.incrementAndGet();
         if (responses.get() >= responseCount)
-            condition.signal();
+            condition.signalAll();
     }
 
     public boolean isLatencyForSnitch()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/utils/SimpleCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SimpleCondition.java b/src/java/org/apache/cassandra/utils/SimpleCondition.java
index 6086f3b..4d5f896 100644
--- a/src/java/org/apache/cassandra/utils/SimpleCondition.java
+++ b/src/java/org/apache/cassandra/utils/SimpleCondition.java
@@ -51,10 +51,9 @@ public class SimpleCondition implements Condition
         return set;
     }
 
-    public synchronized void signal()
+    public void signal()
     {
-        set = true;
-        notify();
+        throw new UnsupportedOperationException();
     }
 
     public synchronized void signalAll()