You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/26 16:55:39 UTC
git commit: change SimpleCondition.signal to UOE patch by Mikhail
Mazursky; reviewed by jbellis for CASSANDRA-5691
Updated Branches:
refs/heads/trunk 3455f1b76 -> 3ec4ff5ed
change SimpleCondition.signal to UOE
patch by Mikhail Mazursky; reviewed by jbellis for CASSANDRA-5691
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3ec4ff5e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3ec4ff5e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3ec4ff5e
Branch: refs/heads/trunk
Commit: 3ec4ff5ed3f362fc77f25c31ae16afbb46030624
Parents: 3455f1b
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jun 26 07:55:28 2013 -0700
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Jun 26 07:55:28 2013 -0700
----------------------------------------------------------------------
.../compaction/ParallelCompactionIterable.java | 26 +++++++++++++-------
.../service/AbstractWriteResponseHandler.java | 2 +-
.../apache/cassandra/service/ReadCallback.java | 2 +-
.../service/TruncateResponseHandler.java | 2 +-
.../apache/cassandra/utils/SimpleCondition.java | 5 ++--
5 files changed, 22 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 1d380f6..a8baf7a 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -195,7 +195,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
private class DeserializedColumnIterator implements OnDiskAtomIterator
{
private final Row row;
- private Iterator<Column> iter;
+ private final Iterator<Column> iter;
public DeserializedColumnIterator(Row row)
{
@@ -236,7 +236,6 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
{
private final LinkedBlockingQueue<RowContainer> queue = new LinkedBlockingQueue<RowContainer>(1);
private static final RowContainer finished = new RowContainer((Row) null);
- private Condition condition;
private final ICompactionScanner scanner;
public Deserializer(ICompactionScanner ssts, final int maxInMemorySize)
@@ -246,11 +245,14 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
{
protected void runMayThrow() throws Exception
{
+ SimpleCondition condition = null;
while (true)
{
if (condition != null)
+ {
condition.await();
-
+ condition = null;
+ }
if (!scanner.hasNext())
{
queue.put(finished);
@@ -260,13 +262,13 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
SSTableIdentityIterator iter = (SSTableIdentityIterator) scanner.next();
if (iter.dataSize > maxInMemorySize)
{
- logger.debug("parallel lazy deserialize from " + iter.getPath());
+ logger.debug("parallel lazy deserialize from {}", iter.getPath());
condition = new SimpleCondition();
queue.put(new RowContainer(new NotifyingSSTableIdentityIterator(iter, condition)));
}
else
{
- logger.debug("parallel eager deserialize from " + iter.getPath());
+ logger.debug("parallel eager deserialize from {}", iter.getPath());
queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory))));
}
}
@@ -301,9 +303,9 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
private static class NotifyingSSTableIdentityIterator implements OnDiskAtomIterator
{
private final SSTableIdentityIterator wrapped;
- private final Condition condition;
+ private final SimpleCondition condition;
- public NotifyingSSTableIdentityIterator(SSTableIdentityIterator wrapped, Condition condition)
+ public NotifyingSSTableIdentityIterator(SSTableIdentityIterator wrapped, SimpleCondition condition)
{
this.wrapped = wrapped;
this.condition = condition;
@@ -321,8 +323,14 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
public void close() throws IOException
{
- wrapped.close();
- condition.signal();
+ try
+ {
+ wrapped.close();
+ }
+ finally
+ {
+ condition.signalAll();
+ }
}
public boolean hasNext()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 7be4c29..1740ee2 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -100,7 +100,7 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
protected void signal()
{
- condition.signal();
+ condition.signalAll();
if (callback != null)
callback.run();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 7cb5a23..bd8b025 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -117,7 +117,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
: received.get();
if (n >= blockfor && resolver.isDataPresent())
{
- condition.signal();
+ condition.signalAll();
maybeResolveForRepair();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index 3920b91..3bacad8 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -70,7 +70,7 @@ public class TruncateResponseHandler implements IAsyncCallback
{
responses.incrementAndGet();
if (responses.get() >= responseCount)
- condition.signal();
+ condition.signalAll();
}
public boolean isLatencyForSnitch()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ec4ff5e/src/java/org/apache/cassandra/utils/SimpleCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SimpleCondition.java b/src/java/org/apache/cassandra/utils/SimpleCondition.java
index 6086f3b..4d5f896 100644
--- a/src/java/org/apache/cassandra/utils/SimpleCondition.java
+++ b/src/java/org/apache/cassandra/utils/SimpleCondition.java
@@ -51,10 +51,9 @@ public class SimpleCondition implements Condition
return set;
}
- public synchronized void signal()
+ public void signal()
{
- set = true;
- notify();
+ throw new UnsupportedOperationException();
}
public synchronized void signalAll()