You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/08/16 07:17:48 UTC
[1/4] git commit: Fix race condition in StreamTransferTask that could
lead to infinite loops and premature sstable deletion
Repository: cassandra
Updated Branches:
refs/heads/trunk ba22af17c -> fe0572778
Fix race condition in StreamTransferTask that could lead to
infinite loops and premature sstable deletion
patch by benedict; reviewed by yukim for CASSANDRA-7704
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8e42d5fe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8e42d5fe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8e42d5fe
Branch: refs/heads/trunk
Commit: 8e42d5fe86f6b2f4dd636ed77793d6ab69792895
Parents: 115bbe4
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sat Aug 16 12:11:45 2014 +0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sat Aug 16 12:11:45 2014 +0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../cassandra/streaming/StreamTransferTask.java | 75 +++++++++++++-------
.../streaming/StreamTransferTaskTest.java | 21 ++++--
3 files changed, 65 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e42d5fe/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 987c227..cf4a115 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.0.10
+ * Fix race condition in StreamTransferTask that could lead to
+ infinite loops and premature sstable deletion (CASSANDRA-7704)
* (cqlsh) Wait up to 10 sec for a tracing session (CASSANDRA-7222)
* Fix NPE in FileCacheService.sizeInBytes (CASSANDRA-7756)
* (cqlsh) cqlsh should automatically disable tracing when selecting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e42d5fe/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index a543d01..629c6bb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -19,8 +19,12 @@ package org.apache.cassandra.streaming;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import io.netty.util.concurrent.*;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.Pair;
@@ -30,13 +34,13 @@ import org.apache.cassandra.utils.Pair;
*/
public class StreamTransferTask extends StreamTask
{
- private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
+ private static final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("StreamingTransferTaskTimeouts"));
private final AtomicInteger sequenceNumber = new AtomicInteger(0);
+ private boolean aborted = false;
- private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>();
-
- private final Map<Integer, ScheduledFuture> timeoutTasks = new ConcurrentHashMap<>();
+ private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+ private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
private long totalSize;
@@ -45,7 +49,7 @@ public class StreamTransferTask extends StreamTask
super(session, cfId);
}
- public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections)
+ public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections)
{
assert sstable != null && cfId.equals(sstable.metadata.cfId);
OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections);
@@ -58,31 +62,42 @@ public class StreamTransferTask extends StreamTask
*
* @param sequenceNumber sequence number of file
*/
- public synchronized void complete(int sequenceNumber)
+ public void complete(int sequenceNumber)
{
- OutgoingFileMessage file = files.remove(sequenceNumber);
- if (file != null)
+ boolean signalComplete;
+ synchronized (this)
{
- file.sstable.releaseReference();
- // all file sent, notify session this task is complete.
- if (files.isEmpty())
- {
- timeoutExecutor.shutdownNow();
- session.taskCompleted(this);
- }
+ ScheduledFuture timeout = timeoutTasks.remove(sequenceNumber);
+ if (timeout != null)
+ timeout.cancel(false);
+
+ OutgoingFileMessage file = files.remove(sequenceNumber);
+ if (file != null)
+ file.sstable.releaseReference();
+
+ signalComplete = files.isEmpty();
}
+
+ // all file sent, notify session this task is complete.
+ if (signalComplete)
+ session.taskCompleted(this);
}
- public void abort()
+ public synchronized void abort()
{
+ if (aborted)
+ return;
+ aborted = true;
+
+ for (ScheduledFuture future : timeoutTasks.values())
+ future.cancel(false);
+ timeoutTasks.clear();
+
for (OutgoingFileMessage file : files.values())
- {
file.sstable.releaseReference();
- }
- timeoutExecutor.shutdownNow();
}
- public int getTotalNumberOfFiles()
+ public synchronized int getTotalNumberOfFiles()
{
return files.size();
}
@@ -92,17 +107,17 @@ public class StreamTransferTask extends StreamTask
return totalSize;
}
- public Collection<OutgoingFileMessage> getFileMessages()
+ public synchronized Collection<OutgoingFileMessage> getFileMessages()
{
// We may race between queuing all those messages and the completion of the completion of
- // the first ones. So copy the values to avoid a ConcurrentModificationException
+ // the first ones. So copy tthe values to avoid a ConcurrentModificationException
return new ArrayList<>(files.values());
}
public synchronized OutgoingFileMessage createMessageForRetry(int sequenceNumber)
{
// remove previous time out task to be rescheduled later
- ScheduledFuture future = timeoutTasks.get(sequenceNumber);
+ ScheduledFuture future = timeoutTasks.remove(sequenceNumber);
if (future != null)
future.cancel(false);
return files.get(sequenceNumber);
@@ -120,18 +135,24 @@ public class StreamTransferTask extends StreamTask
*/
public synchronized ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit)
{
- if (timeoutExecutor.isShutdown())
+ if (!files.containsKey(sequenceNumber))
return null;
ScheduledFuture future = timeoutExecutor.schedule(new Runnable()
{
public void run()
{
- StreamTransferTask.this.complete(sequenceNumber);
- timeoutTasks.remove(sequenceNumber);
+ synchronized (StreamTransferTask.this)
+ {
+ // remove so we don't cancel ourselves
+ timeoutTasks.remove(sequenceNumber);
+ StreamTransferTask.this.complete(sequenceNumber);
+ }
}
}, time, unit);
- timeoutTasks.put(sequenceNumber, future);
+
+ ScheduledFuture prev = timeoutTasks.put(sequenceNumber, future);
+ assert prev == null;
return future;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8e42d5fe/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index ce0f9d0..cc41a8b 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -19,11 +19,14 @@ package org.apache.cassandra.streaming;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
+import junit.framework.Assert;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
@@ -43,7 +46,7 @@ public class StreamTransferTaskTest extends SchemaLoader
String ks = "Keyspace1";
String cf = "Standard1";
- StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), null);
+ StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), null, 0);
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
// create two sstables
@@ -59,19 +62,25 @@ public class StreamTransferTaskTest extends SchemaLoader
{
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
- task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges));
+ task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges), 0);
}
assertEquals(2, task.getTotalNumberOfFiles());
// if file sending completes before timeout then the task should be canceled.
- ScheduledFuture f = task.scheduleTimeout(0, 1, TimeUnit.SECONDS);
- task.complete(0);
- // timeout task may run after complete but it is noop
+ Future f = task.scheduleTimeout(0, 0, TimeUnit.NANOSECONDS);
f.get();
// when timeout runs on second file, task should be completed
f = task.scheduleTimeout(1, 1, TimeUnit.MILLISECONDS);
- f.get();
+ task.complete(1);
+ try
+ {
+ f.get();
+ Assert.assertTrue(false);
+ }
+ catch (CancellationException ex)
+ {
+ }
assertEquals(StreamSession.State.WAIT_COMPLETE, session.state());
// when all streaming are done, time out task should not be scheduled.
[4/4] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fe057277
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fe057277
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fe057277
Branch: refs/heads/trunk
Commit: fe0572778cd4449d3c3d850931f9d41a454b8994
Parents: ba22af1 8a69ef8
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sat Aug 16 12:17:28 2014 +0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sat Aug 16 12:17:28 2014 +0700
----------------------------------------------------------------------
CHANGES.txt | 5 ++
.../cassandra/streaming/StreamTransferTask.java | 83 ++++++++++++--------
.../streaming/StreamTransferTaskTest.java | 17 +++-
3 files changed, 67 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe057277/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe057277/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 5d246f4,cc41a8b..16fa77b
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -22,11 -24,10 +24,12 @@@ import java.util.concurrent.Future
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import org.junit.BeforeClass;
import org.junit.Test;
+ import junit.framework.Assert;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
[2/4] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0
Posted by be...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/streaming/StreamTransferTask.java
test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c01df314
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c01df314
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c01df314
Branch: refs/heads/trunk
Commit: c01df31443c70cdaedd0ff55c0da16c72dbdf019
Parents: d087317 8e42d5f
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sat Aug 16 12:17:06 2014 +0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sat Aug 16 12:17:06 2014 +0700
----------------------------------------------------------------------
CHANGES.txt | 5 ++
.../cassandra/streaming/StreamTransferTask.java | 83 ++++++++++++--------
.../streaming/StreamTransferTaskTest.java | 17 +++-
3 files changed, 67 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c01df314/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index eeb115f,cf4a115..f6d35d3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,24 -1,13 +1,29 @@@
+2.1.0-rc6
+ * Fix OOM issue from netty caching over time (CASSANDRA-7743)
+ * json2sstable couldn't import JSON for CQL table (CASSANDRA-7477)
+ * Invalidate all caches on table drop (CASSANDRA-7561)
+ * Skip strict endpoint selection for ranges if RF == nodes (CASSANRA-7765)
+ * Fix Thrift range filtering without 2ary index lookups (CASSANDRA-7741)
+ * Add tracing entries about concurrent range requests (CASSANDRA-7599)
+ * (cqlsh) Fix DESCRIBE for NTS keyspaces (CASSANDRA-7729)
+ * Remove netty buffer ref-counting (CASSANDRA-7735)
+ * Pass mutated cf to index updater for use by PRSI (CASSANDRA-7742)
+ * Include stress yaml example in release and deb (CASSANDRA-7717)
+ * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
+ * cqlsh DESC CLUSTER fails retrieving ring information (CASSANDRA-7687)
+ * Fix binding null values inside UDT (CASSANDRA-7685)
+ * Fix UDT field selection with empty fields (CASSANDRA-7670)
+ * Bogus deserialization of static cells from sstable (CASSANDRA-7684)
+ * Fix NPE on compaction leftover cleanup for dropped table (CASSANDRA-7770)
+Merged from 2.0:
++=======
+ 2.0.10
+ * Fix race condition in StreamTransferTask that could lead to
+ infinite loops and premature sstable deletion (CASSANDRA-7704)
+ * (cqlsh) Wait up to 10 sec for a tracing session (CASSANDRA-7222)
* Fix NPE in FileCacheService.sizeInBytes (CASSANDRA-7756)
- * (cqlsh) cqlsh should automatically disable tracing when selecting
- from system_traces (CASSANDRA-7641)
- * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927)
- * Don't depend on cassandra config for nodetool ring (CASSANDRA-7508)
- * (cqlsh) Fix failing cqlsh formatting tests (CASSANDRA-7703)
+ * Remove duplicates from StorageService.getJoiningNodes (CASSANDRA-7478)
+ * Clone token map outside of hot gossip loops (CASSANDRA-7758)
* Fix MS expiring map timeout for Paxos messages (CASSANDRA-7752)
* Do not flush on truncate if durable_writes is false (CASSANDRA-7750)
* Give CRR a default input_cql Statement (CASSANDRA-7226)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c01df314/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 48a7d89,629c6bb..6e8c886
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@@ -47,10 -49,10 +49,10 @@@ public class StreamTransferTask extend
super(session, cfId);
}
- public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
- public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections)
++ public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
{
assert sstable != null && cfId.equals(sstable.metadata.cfId);
- OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections);
+ OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
files.put(message.header.sequenceNumber, message);
totalSize += message.header.size();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c01df314/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
[3/4] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1
Posted by be...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8a69ef89
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8a69ef89
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8a69ef89
Branch: refs/heads/trunk
Commit: 8a69ef89892f2a7ffb4f9e67c48d0d8c1ea893b4
Parents: ff9c631 c01df31
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sat Aug 16 12:17:17 2014 +0700
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sat Aug 16 12:17:17 2014 +0700
----------------------------------------------------------------------
CHANGES.txt | 5 ++
.../cassandra/streaming/StreamTransferTask.java | 83 ++++++++++++--------
.../streaming/StreamTransferTaskTest.java | 17 +++-
3 files changed, 67 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a69ef89/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a69ef89/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------