You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/12/09 12:37:53 UTC

[cassandra] branch trunk updated: Add dedicated tcp user timeout for streaming connection

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c6ea8fa  Add dedicated tcp user timeout for streaming connection
c6ea8fa is described below

commit c6ea8fabf9f726a58b76ff6e41d21b650941f880
Author: yifan-c <yc...@gmail.com>
AuthorDate: Tue Nov 17 16:06:03 2020 -0800

    Add dedicated tcp user timeout for streaming connection
    
    patch by Yifan Cai; reviewed by Berenguer Blasi, David Capwell, Adam
    Holmberg and by Benjamin Lerer for CASSANDRA-16143
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |   4 +
 doc/source/configuration/cass_yaml_file.rst        |  24 +++-
 doc/source/operating/metrics.rst                   |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   7 +-
 .../cassandra/config/DatabaseDescriptor.java       |  10 ++
 .../apache/cassandra/metrics/StreamingMetrics.java |   4 +
 .../cassandra/net/OutboundConnectionSettings.java  |  16 ++-
 .../apache/cassandra/service/StorageService.java   |  12 ++
 .../cassandra/service/StorageServiceMBean.java     |   3 +
 .../apache/cassandra/streaming/StreamSession.java  |  23 +++-
 src/java/org/apache/cassandra/tools/NodeProbe.java |   5 +
 .../cassandra/tools/nodetool/GetTimeout.java       |   2 +-
 .../cassandra/distributed/test/NodeToolTest.java   |  49 +++++++--
 .../test/metrics/StreamingMetricsTest.java         | 122 +++++++++++++++++++++
 15 files changed, 263 insertions(+), 20 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0a5849f..520af90 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta4
+ * Add dedicated tcp user timeout for streaming connection (CASSANDRA-16143)
  * Add generatetokens script for offline token allocation strategy generation (CASSANDRA-16205)
  * Remove Windows scripts (CASSANDRA-16171)
  * Improve checksumming and compression in protocol V5 (CASSANDRA-15299)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 7953379..db7cbb2 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -921,6 +921,10 @@ request_timeout_in_ms: 10000
 # which picks up the OS default and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
 # internode_tcp_user_timeout_in_ms = 30000
 
+# The amount of time unacknowledged data is allowed on a streaming connection.
+# The default is 5 minutes. Increase it or set it to 0 in order to increase the timeout.
+# internode_streaming_tcp_user_timeout_in_ms = 300000
+
 # The maximum continuous period a connection may be unwritable in application space
 # internode_application_timeout_in_ms = 30000
 
diff --git a/doc/source/configuration/cass_yaml_file.rst b/doc/source/configuration/cass_yaml_file.rst
index 24e3be0..49471ef 100644
--- a/doc/source/configuration/cass_yaml_file.rst
+++ b/doc/source/configuration/cass_yaml_file.rst
@@ -1406,17 +1406,33 @@ Lowest acceptable value is 10 ms.
 Defensive settings for protecting Cassandra from true network partitions.
 See (CASSANDRA-14358) for details.
 
+
+``internode_tcp_connect_timeout_in_ms``
+---------------------------------------
 The amount of time to wait for internode tcp connections to establish.
-internode_tcp_connect_timeout_in_ms = 2000
 
+*Default Value:* 2000
+
+``internode_tcp_user_timeout_in_ms``
+------------------------------------
 The amount of time unacknowledged data is allowed on a connection before we throw out the connection
 Note this is only supported on Linux + epoll, and it appears to behave oddly above a setting of 30000
 (it takes much longer than 30s) as of Linux 4.12. If you want something that high set this to 0
 which picks up the OS default and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
-internode_tcp_user_timeout_in_ms = 30000
 
-The maximum continuous period a connection may be unwritable in application space
-internode_application_timeout_in_ms = 30000
+*Default Value:* 30000
+
+``internode_streaming_tcp_user_timeout_in_ms``
+----------------------------------------------
+The amount of time unacknowledged data is allowed on a streaming connection before we close the connection.
+
+*Default Value:* 300000 (5 minutes)
+
+``internode_application_timeout_in_ms``
+---------------------------------------
+The maximum continuous period a connection may be unwritable in application space.
+
+*Default Value:* 30000
 
 Global, per-endpoint and per-connection limits imposed on messages queued for delivery to other nodes
 and waiting to be processed on arrival from other nodes in the cluster.  These limits are applied to the on-wire
diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst
index 2b6cef2..80cb3a6 100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@ -467,6 +467,7 @@ Name                       Type           Description
 ========================== ============== ===========
 IncomingBytes              Counter        Number of bytes streamed to this node from the peer.
 OutgoingBytes              Counter        Number of bytes streamed to the peer endpoint from this node.
+IncomingProcessTime        Timer          The time spent on processing the incoming stream message from the peer.
 ========================== ============== ===========
 
 
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index fd91dd7..60c6945 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -172,12 +172,15 @@ public class Config
 
     // Defensive settings for protecting Cassandra from true network partitions. See (CASSANDRA-14358) for details.
     // The amount of time to wait for internode tcp connections to establish.
-    public int internode_tcp_connect_timeout_in_ms = 2000;
+    public volatile int internode_tcp_connect_timeout_in_ms = 2000;
     // The amount of time unacknowledged data is allowed on a connection before we throw out the connection
     // Note this is only supported on Linux + epoll, and it appears to behave oddly above a setting of 30000
     // (it takes much longer than 30s) as of Linux 4.12. If you want something that high set this to 0
     // (which picks up the OS default) and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
-    public int internode_tcp_user_timeout_in_ms = 30000;
+    public volatile int internode_tcp_user_timeout_in_ms = 30000;
+    // Similar to internode_tcp_user_timeout_in_ms but used specifically for streaming connection.
+    // The default is 5 minutes. Increase it or set it to 0 in order to increase the timeout.
+    public volatile int internode_streaming_tcp_user_timeout_in_ms = 300_000; // 5 minutes
 
     public boolean start_native_transport = true;
     public int native_transport_port = 9042;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index daac364..35c1930 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2099,6 +2099,16 @@ public class DatabaseDescriptor
         conf.internode_tcp_user_timeout_in_ms = value;
     }
 
+    public static int getInternodeStreamingTcpUserTimeoutInMS()
+    {
+        return conf.internode_streaming_tcp_user_timeout_in_ms;
+    }
+
+    public static void setInternodeStreamingTcpUserTimeoutInMS(int value)
+    {
+        conf.internode_streaming_tcp_user_timeout_in_ms = value;
+    }
+
     public static int getInternodeMaxMessageSizeInBytes()
     {
         return conf.internode_max_message_size_in_bytes;
diff --git a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
index 80a5e13..756354f 100644
--- a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentMap;
 
 
 import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
 import org.apache.cassandra.locator.InetAddressAndPort;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -43,6 +44,8 @@ public class StreamingMetrics
     public static final Counter totalOutgoingRepairSSTables = Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingRepairSSTables", null));
     public final Counter incomingBytes;
     public final Counter outgoingBytes;
+    /* Measures the time taken for processing the incoming stream message after being deserialized, including the time to flush to disk. */
+    public final Timer incomingProcessTime;
 
     public static StreamingMetrics get(InetAddressAndPort ip)
     {
@@ -74,5 +77,6 @@ public class StreamingMetrics
         MetricNameFactory factory = new DefaultNameFactory("Streaming", peer.toString().replace(':', '.'));
         incomingBytes = Metrics.counter(factory.createMetricName("IncomingBytes"));
         outgoingBytes= Metrics.counter(factory.createMetricName("OutgoingBytes"));
+        incomingProcessTime = Metrics.timer(factory.createMetricName("IncomingProcessTime"));
     }
 }
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
index 1aab412..c3c7825 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
@@ -410,10 +410,18 @@ public class OutboundConnectionSettings
                                              : DatabaseDescriptor.getInternodeTcpConnectTimeoutInMS();
     }
 
-    public int tcpUserTimeoutInMS()
+    public int tcpUserTimeoutInMS(ConnectionCategory category)
     {
-        return tcpUserTimeoutInMS != null ? tcpUserTimeoutInMS
-                                          : DatabaseDescriptor.getInternodeTcpUserTimeoutInMS();
+        // Reusing tcpUserTimeoutInMS for both messaging and streaming, since the connection is created for either one of them.
+        if (tcpUserTimeoutInMS != null)
+            return tcpUserTimeoutInMS;
+
+        switch (category)
+        {
+            case MESSAGING: return DatabaseDescriptor.getInternodeTcpUserTimeoutInMS();
+            case STREAMING: return DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS();
+            default: throw new IllegalArgumentException("Unknown connection category: " + category);
+        }
     }
 
     public boolean tcpNoDelay()
@@ -479,7 +487,7 @@ public class OutboundConnectionSettings
                                               applicationSendQueueReserveEndpointCapacityInBytes(),
                                               applicationSendQueueReserveGlobalCapacityInBytes(),
                                               tcpNoDelay(), flushLowWaterMark, flushHighWaterMark,
-                                              tcpConnectTimeoutInMS(), tcpUserTimeoutInMS(), acceptVersions(category),
+                                              tcpConnectTimeoutInMS(), tcpUserTimeoutInMS(category), acceptVersions(category),
                                               from(), socketFactory(), callbacks(), debug(), endpointToVersion());
     }
 
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index abcd25d..26625c1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1322,6 +1322,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return DatabaseDescriptor.getInternodeTcpUserTimeoutInMS();
     }
 
+    public void setInternodeStreamingTcpUserTimeoutInMS(int value)
+    {
+        Preconditions.checkArgument(value >= 0, "TCP user timeout cannot be negative for internode streaming connection. Got %s", value);
+        DatabaseDescriptor.setInternodeStreamingTcpUserTimeoutInMS(value);
+        logger.info("set internode streaming tcp user timeout to {} ms", value);
+    }
+
+    public int getInternodeStreamingTcpUserTimeoutInMS()
+    {
+        return DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS();
+    }
+
     public void setCounterWriteRpcTimeout(long value)
     {
         DatabaseDescriptor.setCounterWriteRpcTimeout(value);
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 1df3146..7100ed6 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -574,6 +574,9 @@ public interface StorageServiceMBean extends NotificationEmitter
     public void setInternodeTcpUserTimeoutInMS(int value);
     public int getInternodeTcpUserTimeoutInMS();
 
+    public void setInternodeStreamingTcpUserTimeoutInMS(int value);
+    public int getInternodeStreamingTcpUserTimeoutInMS();
+
     public void setCounterWriteRpcTimeout(long value);
     public long getCounterWriteRpcTimeout();
 
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index ff3ff5a..0322f48 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.streaming.async.NettyStreamingMessageSender;
 import org.apache.cassandra.streaming.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NoSpamLogger;
 
 import static com.google.common.collect.Iterables.all;
 import static org.apache.cassandra.net.MessagingService.current_version;
@@ -792,7 +793,27 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         // send back file received message
         messageSender.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
         StreamHook.instance.reportIncomingStream(message.header.tableId, message.stream, this, message.header.sequenceNumber);
-        receivers.get(message.header.tableId).received(message.stream);
+        long receivedStartNanos = System.nanoTime();
+        try
+        {
+            receivers.get(message.header.tableId).received(message.stream);
+        }
+        finally
+        {
+            long latencyNanos = System.nanoTime() - receivedStartNanos;
+            metrics.incomingProcessTime.update(latencyNanos, TimeUnit.NANOSECONDS);
+            long latencyMs = TimeUnit.NANOSECONDS.toMillis(latencyNanos);
+            int timeout = DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS();
+            if (timeout > 0 && latencyMs > timeout)
+                NoSpamLogger.log(logger, NoSpamLogger.Level.WARN,
+                                 1, TimeUnit.MINUTES,
+                                 "The time taken ({} ms) for processing the incoming stream message ({})" +
+                                 " exceeded internode streaming TCP user timeout ({} ms).\n" +
+                                 "The streaming connection might be closed due to tcp user timeout.\n" +
+                                 "Try to increase the internode_streaming_tcp_user_timeout_in_ms" +
+                                 " or set it to 0 to use system defaults.",
+                                 latencyMs, message, timeout);
+        }
     }
 
     public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total)
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 1697325..90990f1 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1178,6 +1178,8 @@ public class NodeProbe implements AutoCloseable
                 return ssProxy.getInternodeTcpConnectTimeoutInMS();
             case "internodeuser":
                 return ssProxy.getInternodeTcpUserTimeoutInMS();
+            case "internodestreaminguser":
+                return ssProxy.getInternodeStreamingTcpUserTimeoutInMS();
             default:
                 throw new RuntimeException("Timeout type requires one of (" + GetTimeout.TIMEOUT_TYPES + ")");
         }
@@ -1268,6 +1270,9 @@ public class NodeProbe implements AutoCloseable
             case "internodeuser":
                 ssProxy.setInternodeTcpUserTimeoutInMS((int) value);
                 break;
+            case "internodestreaminguser":
+                ssProxy.setInternodeStreamingTcpUserTimeoutInMS((int) value);
+                break;
             default:
                 throw new RuntimeException("Timeout type requires one of (" + GetTimeout.TIMEOUT_TYPES + ")");
         }
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java b/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java
index 60637d7..bd720ab 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java
@@ -31,7 +31,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 @Command(name = "gettimeout", description = "Print the timeout of the given type in ms")
 public class GetTimeout extends NodeToolCmd
 {
-    public static final String TIMEOUT_TYPES = "read, range, write, counterwrite, cascontention, truncate, internodeconnect, internodeuser, misc (general rpc_timeout_in_ms)";
+    public static final String TIMEOUT_TYPES = "read, range, write, counterwrite, cascontention, truncate, internodeconnect, internodeuser, internodestreaminguser, misc (general rpc_timeout_in_ms)";
 
     @Arguments(usage = "<timeout_type>", description = "The timeout type, one of (" + TIMEOUT_TYPES + ")")
     private List<String> args = new ArrayList<>();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java b/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
index 248d824..3e7f012 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
@@ -19,12 +19,15 @@
 package org.apache.cassandra.distributed.test;
 
 import java.io.IOException;
+import java.util.function.Consumer;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 
 import static org.junit.Assert.assertEquals;
@@ -32,11 +35,13 @@ import static org.junit.Assert.assertEquals;
 public class NodeToolTest extends TestBaseImpl
 {
     private static Cluster CLUSTER;
+    private static IInvokableInstance NODE;
 
     @BeforeClass
     public static void before() throws IOException
     {
         CLUSTER = init(Cluster.build().withNodes(1).start());
+        NODE = CLUSTER.get(1);
     }
 
     @AfterClass
@@ -49,15 +54,15 @@ public class NodeToolTest extends TestBaseImpl
     @Test
     public void testCommands() throws Throwable
     {
-        assertEquals(0, CLUSTER.get(1).nodetool("help"));
-        assertEquals(0, CLUSTER.get(1).nodetool("flush"));
-        assertEquals(1, CLUSTER.get(1).nodetool("not_a_legal_command"));
+        assertEquals(0, NODE.nodetool("help"));
+        assertEquals(0, NODE.nodetool("flush"));
+        assertEquals(1, NODE.nodetool("not_a_legal_command"));
     }
 
     @Test
     public void testCaptureConsoleOutput() throws Throwable
     {
-        NodeToolResult ringResult = CLUSTER.get(1).nodetoolResult("ring");
+        NodeToolResult ringResult = NODE.nodetoolResult("ring");
         ringResult.asserts().stdoutContains("Datacenter: datacenter0");
         ringResult.asserts().stdoutContains("127.0.0.1  rack0       Up     Normal");
         assertEquals("Non-empty error output", "", ringResult.getStderr());
@@ -67,9 +72,37 @@ public class NodeToolTest extends TestBaseImpl
     public void testNodetoolSystemExit()
     {
         // Verify currently calls System.exit, this test uses that knowlege to test System.exit behavior in jvm-dtest
-        CLUSTER.get(1).nodetoolResult("verify", "--check-tokens")
-               .asserts()
-               .failure()
-               .stdoutContains("Token verification requires --extended-verify");
+        NODE.nodetoolResult("verify", "--check-tokens")
+            .asserts()
+            .failure()
+            .stdoutContains("Token verification requires --extended-verify");
+    }
+
+    @Test
+    public void testSetGetTimeout()
+    {
+        Consumer<String> test = timeout ->
+        {
+            if (timeout != null)
+                NODE.nodetool("settimeout", "internodestreaminguser", timeout);
+            timeout = NODE.callOnInstance(() -> String.valueOf(DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS()));
+            NODE.nodetoolResult("gettimeout", "internodestreaminguser")
+                .asserts()
+                .success()
+                .stdoutContains("Current timeout for type internodestreaminguser: " + timeout + " ms");
+        };
+
+        test.accept(null); // test the default value
+        test.accept("1000"); // 1 second
+        test.accept("36000000"); // 10 minutes
+    }
+
+    @Test
+    public void testSetTimeoutInvalidInput()
+    {
+        NODE.nodetoolResult("settimeout", "internodestreaminguser", "-1")
+            .asserts()
+            .failure()
+            .stdoutContains("timeout must be non-negative");
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java b/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
new file mode 100644
index 0000000..22449e8
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.StreamingMetrics;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class StreamingMetricsTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = Cluster.build().withNodes(2)
+                         .withDataDirCount(1)
+                         .withConfig(config -> config.with(NETWORK)
+                                                     .set("stream_entire_sstables", false))
+                         .start();
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
+    }
+
+    private static InetAddressAndPort getNodeAddress(int num)
+    {
+        InetSocketAddress broadcastAddress = cluster.get(num).broadcastAddress();
+        return InetAddressAndPort.getByAddressOverrideDefaults(broadcastAddress.getAddress(),
+                                                               broadcastAddress.getPort());
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        if (cluster != null)
+            cluster.close();
+    }
+
+
+    @Test
+    public void testStreamMetrics()
+    {
+        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+        final int rowsPerFile = 500;
+        final int files = 5;
+        for (int k = 0; k < files; k++)
+        {
+            for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+                cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"), Integer.toString(i));
+            cluster.get(1).nodetool("flush");
+        }
+
+        cluster.get(2).executeInternal("TRUNCATE system.available_ranges;");
+        Object[][] results = cluster.get(2).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
+        assertThat(results.length).isEqualTo(0);
+
+        InetAddressAndPort node1Address = getNodeAddress(1);
+        InetAddressAndPort node2Address = getNodeAddress(2);
+
+        // Trigger streaming from node 2
+        cluster.get(2).nodetool("rebuild", "--keyspace", KEYSPACE);
+
+        // Assert metrics in node 2
+        long transmittedBytes = cluster.get(2).callOnInstance(() -> {
+            StreamingMetrics metrics = StreamingMetrics.get(node1Address);
+            assertThat(metrics.incomingBytes.getCount())
+                .isGreaterThan(0)
+                .describedAs("There should be bytes streamed from the peer.");
+            assertThat(metrics.outgoingBytes.getCount())
+                .isEqualTo(0)
+                .describedAs("There should not be sstables streamed to the peer.");
+            assertThat(metrics.incomingProcessTime.getCount())
+                .isEqualTo(files)
+                .describedAs("There should be " + files + " files streamed from the peer.");
+            assertThat(metrics.incomingProcessTime.getSnapshot().getMedian())
+                .isGreaterThan(0)
+                .describedAs("The median processing time should be non-0");
+            return metrics.incomingBytes.getCount();
+        });
+
+        // Assert metrics in node 1
+        cluster.get(1).runOnInstance(() -> {
+            StreamingMetrics metrics = StreamingMetrics.get(node2Address);
+            assertThat(metrics.incomingBytes.getCount())
+                .isEqualTo(0).describedAs("There should not be sstables streamed from the peer.");
+            assertThat(metrics.outgoingBytes.getCount())
+                .isEqualTo(transmittedBytes)
+                .describedAs("The outgoingBytes count in node1 should be equals to incomingBytes count in node2");
+            assertThat(metrics.incomingProcessTime.getCount())
+                .isEqualTo(0)
+                .describedAs("There should be no files streamed from the peer.");
+        });
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org