You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/12/01 17:01:58 UTC

git commit: Remove some vestiges of the old streaming

Updated Branches:
  refs/heads/cassandra-2.0 f065cbfe0 -> 36f54f939


Remove some vestiges of the old streaming


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/36f54f93
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/36f54f93
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/36f54f93

Branch: refs/heads/cassandra-2.0
Commit: 36f54f939caeeb57f4583bb526fe3b9f73296cf6
Parents: f065cbf
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Dec 1 18:59:25 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Dec 1 18:59:25 2013 +0300

----------------------------------------------------------------------
 .../apache/cassandra/net/MessagingService.java  | 29 --------------------
 .../cassandra/service/StorageService.java       |  2 --
 2 files changed, 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/36f54f93/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 36d5382..2259dbd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -263,21 +263,6 @@ public final class MessagingService implements MessagingServiceMBean
     /* Lookup table for registering message handlers based on the verb. */
     private final Map<Verb, IVerbHandler> verbHandlers;
 
-    /**
-     * One executor per destination InetAddress for streaming.
-     * <p/>
-     * See CASSANDRA-3494 for the background. We have streaming in place so we do not want to limit ourselves to
-     * one stream at a time for throttling reasons. But, we also do not want to just arbitrarily stream an unlimited
-     * amount of files at once because a single destination might have hundreds of files pending and it would cause a
-     * seek storm. So, transfer exactly one file per destination host. That puts a very natural rate limit on it, in
-     * addition to mapping well to the expected behavior in many cases.
-     * <p/>
-     * We will create our stream executors with a core size of 0 so that they time out and do not consume threads. This
-     * means the overhead in the degenerate case of having streamed to everyone in the ring over time as a ring changes,
-     * is not going to be a thread per node - but rather an instance per node. That's totally fine.
-     */
-    private final ConcurrentMap<InetAddress, DebuggableThreadPoolExecutor> streamExecutors = new NonBlockingHashMap<InetAddress, DebuggableThreadPoolExecutor>();
-
     private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
 
     private static final Logger logger = LoggerFactory.getLogger(MessagingService.class);
@@ -672,20 +657,6 @@ public final class MessagingService implements MessagingServiceMBean
         callbacks.reset();
     }
 
-    public void waitForStreaming() throws InterruptedException
-    {
-        // this does not prevent new streams from beginning after a drain begins, but since streams are only
-        // started in response to explicit operator action (bootstrap/move/repair/etc) that feels like a feature.
-        for (DebuggableThreadPoolExecutor e : streamExecutors.values())
-            e.shutdown();
-
-        for (DebuggableThreadPoolExecutor e : streamExecutors.values())
-        {
-            if (!e.awaitTermination(24, TimeUnit.HOURS))
-                logger.error("Stream took more than 24H to complete; skipping");
-        }
-    }
-
     /**
      * Wait for callbacks and don't allow any more to be created (since they could require writing hints)
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36f54f93/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5b3a3e1..18ce97b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3217,8 +3217,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         setMode(Mode.DRAINING, "shutting down MessageService", false);
         MessagingService.instance().shutdown();
-        setMode(Mode.DRAINING, "waiting for streaming", false);
-        MessagingService.instance().waitForStreaming();
 
         setMode(Mode.DRAINING, "clearing mutation stage", false);
         mutationStage.shutdown();