You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/05/26 20:27:21 UTC

[03/10] cassandra git commit: Backport CASSANDRA-3569

Backport CASSANDRA-3569

patch by Omid Aladini;reviewed by yukim for CASSANDRA-9455


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e4ed966
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e4ed966
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e4ed966

Branch: refs/heads/cassandra-2.2
Commit: 3e4ed96663ed784e4cb1df17d4a9a7b2eff9e60b
Parents: 04287d4
Author: Omid Aladini <om...@gmail.com>
Authored: Tue May 26 13:16:50 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue May 26 13:26:27 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 debian/cassandra-sysctl.conf                    |  1 +
 .../streaming/DefaultConnectionFactory.java     |  1 +
 .../cassandra/streaming/StreamSession.java      | 22 ++------------------
 4 files changed, 5 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4ed966/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0bff87b..af08802 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * Push correct protocol notification for DROP INDEX (CASSANDRA-9310)
  * token-generator - generated tokens too long (CASSANDRA-9300)
  * Add option not to validate atoms during scrub (CASSANDRA-9406)
+ * Backport CASSANDRA-3569 (CASSANDRA-9455)
 
 
 2.0.15:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4ed966/debian/cassandra-sysctl.conf
----------------------------------------------------------------------
diff --git a/debian/cassandra-sysctl.conf b/debian/cassandra-sysctl.conf
index 2173765..443e83f 100644
--- a/debian/cassandra-sysctl.conf
+++ b/debian/cassandra-sysctl.conf
@@ -1 +1,2 @@
 vm.max_map_count = 1048575
+net.ipv4.tcp_keepalive_time=300

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4ed966/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
index 53af4c8..f711490 100644
--- a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
+++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
@@ -51,6 +51,7 @@ public class DefaultConnectionFactory implements StreamConnectionFactory
             {
                 Socket socket = OutboundTcpConnectionPool.newSocket(peer);
                 socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
+                socket.setKeepAlive(true);
                 return socket;
             }
             catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4ed966/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index db0c484..5f774e8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -109,7 +109,7 @@ import org.apache.cassandra.utils.Pair;
  *       session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and
  *       send a CompleteMessage to the other side.
  */
-public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class StreamSession implements IEndpointStateChangeSubscriber
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
 
@@ -194,10 +194,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
     public void init(StreamResultFuture streamResult)
     {
         this.streamResult = streamResult;
-
-        // register to gossiper/FD to fail on node failure
-        Gossiper.instance.register(this);
-        FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
 
     public void start()
@@ -372,8 +368,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
             // incoming thread (so we would deadlock).
             handler.close();
 
-            Gossiper.instance.unregister(this);
-            FailureDetector.instance.unregisterFailureDetectionEventListener(this);
             streamResult.handleSessionComplete(this);
         }
     }
@@ -627,23 +621,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
 
     public void onRemove(InetAddress endpoint)
     {
-        convict(endpoint, Double.MAX_VALUE);
+        closeSession(State.FAILED);
     }
 
     public void onRestart(InetAddress endpoint, EndpointState epState)
     {
-        convict(endpoint, Double.MAX_VALUE);
-    }
-
-    public void convict(InetAddress endpoint, double phi)
-    {
-        if (!endpoint.equals(peer))
-            return;
-
-        // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost (CASSANDRA-7063)
-        if (phi < 100 * DatabaseDescriptor.getPhiConvictThreshold())
-            return;
-
         closeSession(State.FAILED);
     }