You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/06/03 08:14:40 UTC
[1/7] git commit: Fix infinite loop on exception while streaming
Repository: cassandra
Updated Branches:
refs/heads/trunk ac8d3c174 -> 0a9845203
Fix infinite loop on exception while streaming
Patch by JoshuaMcKenzie; reviewed by marcuse for CASSANDRA-7330
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58bb974a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58bb974a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58bb974a
Branch: refs/heads/trunk
Commit: 58bb974a330ab71d2ee34cace565f489e26629c7
Parents: 62d9c43
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 3 07:54:21 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 3 08:01:34 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/streaming/StreamReader.java | 15 +++++++++++++--
2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58bb974a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bc95a8d..37105f3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
* Add authentication support to shuffle (CASSANDRA-6484)
* Cqlsh counts non-empty lines for "Blank lines" warning (CASSANDRA-7325)
* Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
+ * Fix infinite loop on exception while streaming (CASSANDRA-7330)
Merged from 1.2:
* Fix availability validation for LOCAL_ONE CL (CASSANDRA-7319)
* Use LOCAL_ONE for non-superuser auth queries (CASSANDRA-7328)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58bb974a/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 72c239c..15aa3cb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -114,9 +114,20 @@ public class StreamReader
protected void drain(InputStream dis, long bytesRead) throws IOException
{
long toSkip = totalSize() - bytesRead;
- toSkip = toSkip - dis.skip(toSkip);
+
+ // InputStream.skip can return -1 if dis is inaccessible.
+ long skipped = dis.skip(toSkip);
+ if (skipped == -1)
+ return;
+
+ toSkip = toSkip - skipped;
while (toSkip > 0)
- toSkip = toSkip - dis.skip(toSkip);
+ {
+ skipped = dis.skip(toSkip);
+ if (skipped == -1)
+ break;
+ toSkip = toSkip - skipped;
+ }
}
protected long totalSize()
[3/7] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4b477b45
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4b477b45
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4b477b45
Branch: refs/heads/trunk
Commit: 4b477b45981de09f23a13186e015b26870bfd514
Parents: ac8d3c1 d0c90e0
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 3 08:02:17 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 3 08:02:17 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/streaming/StreamReader.java | 15 +++++++++++++--
2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b477b45/CHANGES.txt
----------------------------------------------------------------------
[7/7] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a984520
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a984520
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a984520
Branch: refs/heads/trunk
Commit: 0a9845203a2b6eeca0dbe77acade6d749f0682f6
Parents: 4b437b4 0f2d7d0
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 3 08:13:52 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 3 08:13:52 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
debian/cassandra-sysctl.conf | 1 +
.../org/apache/cassandra/repair/RepairJob.java | 15 ++++++--
.../apache/cassandra/repair/RepairSession.java | 37 ++++++++++++++++----
.../cassandra/service/ActiveRepairService.java | 1 -
.../cassandra/streaming/ConnectionHandler.java | 1 +
.../cassandra/streaming/StreamSession.java | 22 ++----------
.../cassandra/streaming/StreamTransferTask.java | 12 +++++--
8 files changed, 58 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a984520/CHANGES.txt
----------------------------------------------------------------------
[5/7] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4b437b4e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4b437b4e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4b437b4e
Branch: refs/heads/trunk
Commit: 4b437b4e9dd8ae65fde1b6cb04a492811dd5b120
Parents: 4b477b4 878990c
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 3 08:05:33 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 3 08:05:33 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/streaming/StreamCoordinator.java | 5 +++++
src/java/org/apache/cassandra/streaming/StreamResultFuture.java | 2 +-
3 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b437b4e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0db4926,333606a..c7da943
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,15 -1,6 +1,16 @@@
+3.0
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7208)
+ * Make incremental repair default (CASSANDRA-7250)
+
+
2.1.0
* Upgrade to Pig 0.12.1 (CASSANDRA-6556)
+ * Make sure we clear out repair sessions from netstats (CASSANDRA-7329)
Merged from 2.0:
* Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
* Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
[4/7] git commit: Make sure we clear out repair sessions from netstats
Posted by ma...@apache.org.
Make sure we clear out repair sessions from netstats
Patch by JoshuaMcKenzie; reviewed by marcuse for CASSANDRA-7329
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/878990cd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/878990cd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/878990cd
Branch: refs/heads/trunk
Commit: 878990cde853d2e43f1d1da107878ddafcef6262
Parents: d0c90e0
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 3 08:04:52 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 3 08:04:52 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/streaming/StreamCoordinator.java | 5 +++++
src/java/org/apache/cassandra/streaming/StreamResultFuture.java | 2 +-
3 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/878990cd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 188326d..333606a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
2.1.0
* Upgrade to Pig 0.12.1 (CASSANDRA-6556)
+ * Make sure we clear out repair sessions from netstats (CASSANDRA-7329)
Merged from 2.0:
* Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
* Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/878990cd/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index ed94f89..4a6b193 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -73,6 +73,11 @@ public class StreamCoordinator
return results;
}
+ public boolean isReceiving()
+ {
+ return connectionsPerHost == 0;
+ }
+
public void connectAllStreamSessions()
{
for (HostStreamingData data : peerSessions.values())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/878990cd/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index cc7abbb..6a143d1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -65,7 +65,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
this.coordinator = coordinator;
// if there is no session to listen to, we immediately set result for returning
- if (!coordinator.hasActiveSessions())
+ if (!coordinator.isReceiving() && !coordinator.hasActiveSessions())
set(getCurrentState());
}
[6/7] git commit: Don't fail streams on failure detector downs
Posted by ma...@apache.org.
Don't fail streams on failure detector downs
Patch by JoshuaMcKenzie; reviewed by marcuse for CASSANDRA-3569
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f2d7d0b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f2d7d0b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f2d7d0b
Branch: refs/heads/trunk
Commit: 0f2d7d0b9540efa3ea3dfe4f8270c3635afdc63c
Parents: 878990c
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 3 08:11:56 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 3 08:11:56 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
debian/cassandra-sysctl.conf | 1 +
.../org/apache/cassandra/repair/RepairJob.java | 15 ++++++--
.../apache/cassandra/repair/RepairSession.java | 37 ++++++++++++++++----
.../cassandra/service/ActiveRepairService.java | 1 -
.../cassandra/streaming/ConnectionHandler.java | 1 +
.../cassandra/streaming/StreamSession.java | 22 ++----------
.../cassandra/streaming/StreamTransferTask.java | 12 +++++--
8 files changed, 58 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 333606a..b5c2feb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
2.1.0
* Upgrade to Pig 0.12.1 (CASSANDRA-6556)
* Make sure we clear out repair sessions from netstats (CASSANDRA-7329)
+ * Don't fail streams on failure detector downs (CASSANDRA-3569)
Merged from 2.0:
* Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
* Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/debian/cassandra-sysctl.conf
----------------------------------------------------------------------
diff --git a/debian/cassandra-sysctl.conf b/debian/cassandra-sysctl.conf
index 2173765..443e83f 100644
--- a/debian/cassandra-sysctl.conf
+++ b/debian/cassandra-sysctl.conf
@@ -1 +1,2 @@
vm.max_map_count = 1048575
+net.ipv4.tcp_keepalive_time=300
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index af00403..8057ed5 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -58,11 +58,21 @@ public class RepairJob
/* Count down as sync completes */
private AtomicInteger waitForSync;
+ private final IRepairJobEventListener listener;
+
/**
* Create repair job to run on specific columnfamily
*/
- public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
+ public RepairJob(IRepairJobEventListener listener,
+ UUID parentSessionId,
+ UUID sessionId,
+ String keyspace,
+ String columnFamily,
+ Range<Token> range,
+ boolean isSequential,
+ ListeningExecutorService taskExecutor)
{
+ this.listener = listener;
this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
this.isSequential = isSequential;
this.taskExecutor = taskExecutor;
@@ -114,7 +124,8 @@ public class RepairJob
public void onFailure(Throwable throwable)
{
// TODO need to propagate error to RepairSession
- logger.error("Error while snapshot", throwable);
+ logger.error("Error occurred during snapshot phase", throwable);
+ listener.failedSnapshot();
failed = true;
}
}, taskExecutor);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 507dafa..346f3f4 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -74,7 +74,9 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
* Similarly, if a job is sequential, it will handle one Differencer at a time, but will handle
* all of them in parallel otherwise.
*/
-public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber,
+ IFailureDetectionEventListener,
+ IRepairJobEventListener
{
private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
@@ -89,9 +91,11 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
private volatile Exception exception;
private final AtomicBoolean isFailed = new AtomicBoolean(false);
+ private final AtomicBoolean fdUnregistered = new AtomicBoolean(false);
// First, all RepairJobs are added to this queue,
final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<>();
+
// and after receiving all validation, the job is moved to
// this map, keyed by CF name.
final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
@@ -169,23 +173,32 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
assert job.desc.equals(desc);
if (job.addTree(endpoint, tree) == 0)
{
- logger.debug("All response received for {}/{}", getId(), desc.columnFamily);
+ logger.debug("All responses received for {}/{}", getId(), desc.columnFamily);
if (!job.isFailed())
{
syncingJobs.put(job.desc.columnFamily, job);
job.submitDifferencers();
}
- // This job is complete, switching to next in line (note that only
- // one thread will can ever do this)
+ // This job is complete, switching to next in line (note that only one thread will ever do this)
jobs.poll();
RepairJob nextJob = jobs.peek();
if (nextJob == null)
+ {
+ // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
+ // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
+ // See CASSANDRA-3569
+ if (fdUnregistered.compareAndSet(false, true))
+ FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+
// We are done with this repair session as far as differencing
// is considered. Just inform the session
differencingDone.signalAll();
+ }
else
+ {
nextJob.sendTreeRequests(endpoints);
+ }
}
}
@@ -271,7 +284,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
// Create and queue a RepairJob for each column family
for (String cfname : cfnames)
{
- RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor);
+ RepairJob job = new RepairJob(this, parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor);
jobs.offer(job);
}
logger.debug("Sending tree requests to endpoints {}", endpoints);
@@ -299,7 +312,13 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
{
// mark this session as terminated
terminate();
+
ActiveRepairService.instance.removeFromActiveSessions(this);
+
+ // If we've reached here in an exception state without completing Merkle Tree sync, we'll still be registered
+ // with the FailureDetector.
+ if (fdUnregistered.compareAndSet(false, true))
+ FailureDetector.instance.unregisterFailureDetectionEventListener(this);
}
}
@@ -320,11 +339,17 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
completed.signalAll();
}
+ public void failedSnapshot()
+ {
+ exception = new IOException("Failed during snapshot creation.");
+ forceShutdown();
+ }
+
void failedNode(InetAddress remote)
{
String errorMsg = String.format("Endpoint %s died", remote);
exception = new IOException(errorMsg);
- // If a node failed, we stop everything (though there could still be some activity in the background)
+ // If a node failed during Merkle creation, we stop everything (though there could still be some activity in the background)
forceShutdown();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b300547..7f7325b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -135,7 +135,6 @@ public class ActiveRepairService
public void removeFromActiveSessions(RepairSession session)
{
- FailureDetector.instance.unregisterFailureDetectionEventListener(session);
Gossiper.instance.unregister(session);
sessions.remove(session.getId());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 5484c83..5716ae9 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -123,6 +123,7 @@ public class ConnectionHandler
{
Socket socket = OutboundTcpConnectionPool.newSocket(peer);
socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
+ socket.setKeepAlive(true);
return socket;
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 411f969..1afc07e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -108,7 +108,7 @@ import org.apache.cassandra.utils.Pair;
* session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and
* send a CompleteMessage to the other side.
*/
-public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class StreamSession implements IEndpointStateChangeSubscriber
{
private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
public final InetAddress peer;
@@ -181,10 +181,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
public void init(StreamResultFuture streamResult)
{
this.streamResult = streamResult;
-
- // register to gossiper/FD to fail on node failure
- Gossiper.instance.register(this);
- FailureDetector.instance.registerFailureDetectionEventListener(this);
}
public void start()
@@ -358,8 +354,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
// incoming thread (so we would deadlock).
handler.close();
- Gossiper.instance.unregister(this);
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
streamResult.handleSessionComplete(this);
}
}
@@ -613,23 +607,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
public void onRemove(InetAddress endpoint)
{
- convict(endpoint, Double.MAX_VALUE);
+ closeSession(State.FAILED);
}
public void onRestart(InetAddress endpoint, EndpointState epState)
{
- convict(endpoint, Double.MAX_VALUE);
- }
-
- public void convict(InetAddress endpoint, double phi)
- {
- if (!endpoint.equals(peer))
- return;
-
- // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.
- if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
- return;
-
closeSession(State.FAILED);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 2fe75fa..48a7d89 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.streaming;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
@@ -33,6 +34,7 @@ public class StreamTransferTask extends StreamTask
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final AtomicInteger sequenceNumber = new AtomicInteger(0);
+ private AtomicBoolean aborted = new AtomicBoolean(false);
private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>();
@@ -75,11 +77,15 @@ public class StreamTransferTask extends StreamTask
public void abort()
{
- for (OutgoingFileMessage file : files.values())
+ // Prevent releasing reference multiple times
+ if (aborted.compareAndSet(false, true))
{
- file.sstable.releaseReference();
+ for (OutgoingFileMessage file : files.values())
+ {
+ file.sstable.releaseReference();
+ }
+ timeoutExecutor.shutdownNow();
}
- timeoutExecutor.shutdownNow();
}
public int getTotalNumberOfFiles()
[2/7] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by ma...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d0c90e07
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d0c90e07
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d0c90e07
Branch: refs/heads/trunk
Commit: d0c90e07ab3fea2aff80e73fa3bc58bef756ad68
Parents: 371507a 58bb974
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 3 08:01:50 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 3 08:01:50 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/streaming/StreamReader.java | 15 +++++++++++++--
2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0c90e07/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c7c6877,37105f3..188326d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,9 -1,19 +1,10 @@@
-2.0.9
+2.1.0
+ * Upgrade to Pig 0.12.1 (CASSANDRA-6556)
+Merged from 2.0:
* Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
- * Add conditional CREATE/DROP USER support (CASSANDRA-7264)
- * Swap local and global default read repair chances (CASSANDRA-7320)
- * Add missing iso8601 patterns for date strings (CASSANDRA-6973)
- * Support selecting multiple rows in a partition using IN (CASSANDRA-6875)
- * cqlsh: always emphasize the partition key in DESC output (CASSANDRA-7274)
- * Copy compaction options to make sure they are reloaded (CASSANDRA-7290)
- * Add option to do more aggressive tombstone compactions (CASSANDRA-6563)
- * Don't try to compact already-compacting files in HHOM (CASSANDRA-7288)
- * Add authentication support to shuffle (CASSANDRA-6484)
- * Cqlsh counts non-empty lines for "Blank lines" warning (CASSANDRA-7325)
* Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
+ * Fix infinite loop on exception while streaming (CASSANDRA-7330)
Merged from 1.2:
- * Fix availability validation for LOCAL_ONE CL (CASSANDRA-7319)
* Use LOCAL_ONE for non-superuser auth queries (CASSANDRA-7328)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0c90e07/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------