You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/06/03 08:14:31 UTC
[4/4] git commit: Don't fail streams on failure detector downs
Don't fail streams on failure detector downs
Patch by JoshuaMcKenzie; reviewed by marcuse for CASSANDRA-3569
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f2d7d0b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f2d7d0b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f2d7d0b
Branch: refs/heads/cassandra-2.1
Commit: 0f2d7d0b9540efa3ea3dfe4f8270c3635afdc63c
Parents: 878990c
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 3 08:11:56 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 3 08:11:56 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
debian/cassandra-sysctl.conf | 1 +
.../org/apache/cassandra/repair/RepairJob.java | 15 ++++++--
.../apache/cassandra/repair/RepairSession.java | 37 ++++++++++++++++----
.../cassandra/service/ActiveRepairService.java | 1 -
.../cassandra/streaming/ConnectionHandler.java | 1 +
.../cassandra/streaming/StreamSession.java | 22 ++----------
.../cassandra/streaming/StreamTransferTask.java | 12 +++++--
8 files changed, 58 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 333606a..b5c2feb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
2.1.0
* Upgrade to Pig 0.12.1 (CASSANDRA-6556)
* Make sure we clear out repair sessions from netstats (CASSANDRA-7329)
+ * Don't fail streams on failure detector downs (CASSANDRA-3569)
Merged from 2.0:
* Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
* Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/debian/cassandra-sysctl.conf
----------------------------------------------------------------------
diff --git a/debian/cassandra-sysctl.conf b/debian/cassandra-sysctl.conf
index 2173765..443e83f 100644
--- a/debian/cassandra-sysctl.conf
+++ b/debian/cassandra-sysctl.conf
@@ -1 +1,2 @@
vm.max_map_count = 1048575
+net.ipv4.tcp_keepalive_time=300
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index af00403..8057ed5 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -58,11 +58,21 @@ public class RepairJob
/* Count down as sync completes */
private AtomicInteger waitForSync;
+ private final IRepairJobEventListener listener;
+
/**
* Create repair job to run on specific columnfamily
*/
- public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
+ public RepairJob(IRepairJobEventListener listener,
+ UUID parentSessionId,
+ UUID sessionId,
+ String keyspace,
+ String columnFamily,
+ Range<Token> range,
+ boolean isSequential,
+ ListeningExecutorService taskExecutor)
{
+ this.listener = listener;
this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
this.isSequential = isSequential;
this.taskExecutor = taskExecutor;
@@ -114,7 +124,8 @@ public class RepairJob
public void onFailure(Throwable throwable)
{
// TODO need to propagate error to RepairSession
- logger.error("Error while snapshot", throwable);
+ logger.error("Error occurred during snapshot phase", throwable);
+ listener.failedSnapshot();
failed = true;
}
}, taskExecutor);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 507dafa..346f3f4 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -74,7 +74,9 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
* Similarly, if a job is sequential, it will handle one Differencer at a time, but will handle
* all of them in parallel otherwise.
*/
-public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber,
+ IFailureDetectionEventListener,
+ IRepairJobEventListener
{
private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
@@ -89,9 +91,11 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
private volatile Exception exception;
private final AtomicBoolean isFailed = new AtomicBoolean(false);
+ private final AtomicBoolean fdUnregistered = new AtomicBoolean(false);
// First, all RepairJobs are added to this queue,
final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<>();
+
// and after receiving all validation, the job is moved to
// this map, keyed by CF name.
final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
@@ -169,23 +173,32 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
assert job.desc.equals(desc);
if (job.addTree(endpoint, tree) == 0)
{
- logger.debug("All response received for {}/{}", getId(), desc.columnFamily);
+ logger.debug("All responses received for {}/{}", getId(), desc.columnFamily);
if (!job.isFailed())
{
syncingJobs.put(job.desc.columnFamily, job);
job.submitDifferencers();
}
- // This job is complete, switching to next in line (note that only
- // one thread will can ever do this)
+ // This job is complete, switching to next in line (note that only one thread will ever do this)
jobs.poll();
RepairJob nextJob = jobs.peek();
if (nextJob == null)
+ {
+ // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
+ // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
+ // See CASSANDRA-3569
+ if (fdUnregistered.compareAndSet(false, true))
+ FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+
// We are done with this repair session as far as differencing
// is considered. Just inform the session
differencingDone.signalAll();
+ }
else
+ {
nextJob.sendTreeRequests(endpoints);
+ }
}
}
@@ -271,7 +284,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
// Create and queue a RepairJob for each column family
for (String cfname : cfnames)
{
- RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor);
+ RepairJob job = new RepairJob(this, parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor);
jobs.offer(job);
}
logger.debug("Sending tree requests to endpoints {}", endpoints);
@@ -299,7 +312,13 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
{
// mark this session as terminated
terminate();
+
ActiveRepairService.instance.removeFromActiveSessions(this);
+
+ // If we've reached here in an exception state without completing Merkle Tree sync, we'll still be registered
+ // with the FailureDetector.
+ if (fdUnregistered.compareAndSet(false, true))
+ FailureDetector.instance.unregisterFailureDetectionEventListener(this);
}
}
@@ -320,11 +339,17 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
completed.signalAll();
}
+ public void failedSnapshot()
+ {
+ exception = new IOException("Failed during snapshot creation.");
+ forceShutdown();
+ }
+
void failedNode(InetAddress remote)
{
String errorMsg = String.format("Endpoint %s died", remote);
exception = new IOException(errorMsg);
- // If a node failed, we stop everything (though there could still be some activity in the background)
+ // If a node failed during Merkle creation, we stop everything (though there could still be some activity in the background)
forceShutdown();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b300547..7f7325b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -135,7 +135,6 @@ public class ActiveRepairService
public void removeFromActiveSessions(RepairSession session)
{
- FailureDetector.instance.unregisterFailureDetectionEventListener(session);
Gossiper.instance.unregister(session);
sessions.remove(session.getId());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 5484c83..5716ae9 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -123,6 +123,7 @@ public class ConnectionHandler
{
Socket socket = OutboundTcpConnectionPool.newSocket(peer);
socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
+ socket.setKeepAlive(true);
return socket;
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 411f969..1afc07e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -108,7 +108,7 @@ import org.apache.cassandra.utils.Pair;
* session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and
* send a CompleteMessage to the other side.
*/
-public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class StreamSession implements IEndpointStateChangeSubscriber
{
private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
public final InetAddress peer;
@@ -181,10 +181,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
public void init(StreamResultFuture streamResult)
{
this.streamResult = streamResult;
-
- // register to gossiper/FD to fail on node failure
- Gossiper.instance.register(this);
- FailureDetector.instance.registerFailureDetectionEventListener(this);
}
public void start()
@@ -358,8 +354,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
// incoming thread (so we would deadlock).
handler.close();
- Gossiper.instance.unregister(this);
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
streamResult.handleSessionComplete(this);
}
}
@@ -613,23 +607,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
public void onRemove(InetAddress endpoint)
{
- convict(endpoint, Double.MAX_VALUE);
+ closeSession(State.FAILED);
}
public void onRestart(InetAddress endpoint, EndpointState epState)
{
- convict(endpoint, Double.MAX_VALUE);
- }
-
- public void convict(InetAddress endpoint, double phi)
- {
- if (!endpoint.equals(peer))
- return;
-
- // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.
- if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
- return;
-
closeSession(State.FAILED);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 2fe75fa..48a7d89 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.streaming;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
@@ -33,6 +34,7 @@ public class StreamTransferTask extends StreamTask
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final AtomicInteger sequenceNumber = new AtomicInteger(0);
+ private AtomicBoolean aborted = new AtomicBoolean(false);
private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>();
@@ -75,11 +77,15 @@ public class StreamTransferTask extends StreamTask
public void abort()
{
- for (OutgoingFileMessage file : files.values())
+ // Prevent releasing reference multiple times
+ if (aborted.compareAndSet(false, true))
{
- file.sstable.releaseReference();
+ for (OutgoingFileMessage file : files.values())
+ {
+ file.sstable.releaseReference();
+ }
+ timeoutExecutor.shutdownNow();
}
- timeoutExecutor.shutdownNow();
}
public int getTotalNumberOfFiles()