You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/05/26 20:27:20 UTC
[02/10] cassandra git commit: Backport CASSANDRA-3569
Backport CASSANDRA-3569
patch by Omid Aladini;reviewed by yukim for CASSANDRA-9455
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e4ed966
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e4ed966
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e4ed966
Branch: refs/heads/cassandra-2.1
Commit: 3e4ed96663ed784e4cb1df17d4a9a7b2eff9e60b
Parents: 04287d4
Author: Omid Aladini <om...@gmail.com>
Authored: Tue May 26 13:16:50 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue May 26 13:26:27 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
debian/cassandra-sysctl.conf | 1 +
.../streaming/DefaultConnectionFactory.java | 1 +
.../cassandra/streaming/StreamSession.java | 22 ++------------------
4 files changed, 5 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4ed966/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0bff87b..af08802 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
* Push correct protocol notification for DROP INDEX (CASSANDRA-9310)
* token-generator - generated tokens too long (CASSANDRA-9300)
* Add option not to validate atoms during scrub (CASSANDRA-9406)
+ * Backport CASSANDRA-3569 (CASSANDRA-9455)
2.0.15:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4ed966/debian/cassandra-sysctl.conf
----------------------------------------------------------------------
diff --git a/debian/cassandra-sysctl.conf b/debian/cassandra-sysctl.conf
index 2173765..443e83f 100644
--- a/debian/cassandra-sysctl.conf
+++ b/debian/cassandra-sysctl.conf
@@ -1 +1,2 @@
vm.max_map_count = 1048575
+net.ipv4.tcp_keepalive_time=300
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4ed966/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
index 53af4c8..f711490 100644
--- a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
+++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
@@ -51,6 +51,7 @@ public class DefaultConnectionFactory implements StreamConnectionFactory
{
Socket socket = OutboundTcpConnectionPool.newSocket(peer);
socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
+ socket.setKeepAlive(true);
return socket;
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4ed966/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index db0c484..5f774e8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -109,7 +109,7 @@ import org.apache.cassandra.utils.Pair;
* session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and
* send a CompleteMessage to the other side.
*/
-public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class StreamSession implements IEndpointStateChangeSubscriber
{
private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
@@ -194,10 +194,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
public void init(StreamResultFuture streamResult)
{
this.streamResult = streamResult;
-
- // register to gossiper/FD to fail on node failure
- Gossiper.instance.register(this);
- FailureDetector.instance.registerFailureDetectionEventListener(this);
}
public void start()
@@ -372,8 +368,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
// incoming thread (so we would deadlock).
handler.close();
- Gossiper.instance.unregister(this);
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
streamResult.handleSessionComplete(this);
}
}
@@ -627,23 +621,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
public void onRemove(InetAddress endpoint)
{
- convict(endpoint, Double.MAX_VALUE);
+ closeSession(State.FAILED);
}
public void onRestart(InetAddress endpoint, EndpointState epState)
{
- convict(endpoint, Double.MAX_VALUE);
- }
-
- public void convict(InetAddress endpoint, double phi)
- {
- if (!endpoint.equals(peer))
- return;
-
- // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost (CASSANDRA-7063)
- if (phi < 100 * DatabaseDescriptor.getPhiConvictThreshold())
- return;
-
closeSession(State.FAILED);
}