You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/11/18 03:55:34 UTC

[GitHub] [cassandra] yifan-c opened a new pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

yifan-c opened a new pull request #824:
URL: https://github.com/apache/cassandra/pull/824


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
blerer commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r535099970



##########
File path: src/java/org/apache/cassandra/config/Config.java
##########
@@ -178,6 +178,8 @@
     // (it takes much longer than 30s) as of Linux 4.12. If you want something that high set this to 0
     // (which picks up the OS default) and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
     public int internode_tcp_user_timeout_in_ms = 30000;
+    // Similar to internode_tcp_user_timeout_in_ms but used specifically for streaming connection.
+    public int internode_streaming_tcp_user_timeout_in_ms = 0;

Review comment:
       It may be interesting to have that discussion on the ticket to get other people attention. Jon might have an opinion as he is the one that opened the ticket.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] aholmberg commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
aholmberg commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r533608554



##########
File path: src/java/org/apache/cassandra/config/Config.java
##########
@@ -178,6 +178,8 @@
     // (it takes much longer than 30s) as of Linux 4.12. If you want something that high set this to 0
     // (which picks up the OS default) and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
     public int internode_tcp_user_timeout_in_ms = 30000;
+    // Similar to internode_tcp_user_timeout_in_ms but used specifically for streaming connection.
+    public int internode_streaming_tcp_user_timeout_in_ms = 0;

Review comment:
       Just a nit -- I see this is following a similar pattern to the above variable, but what is the "user" meant to convey in this naming?

##########
File path: src/java/org/apache/cassandra/metrics/StreamingMetrics.java
##########
@@ -43,6 +44,8 @@
     public static final Counter totalOutgoingRepairSSTables = Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingRepairSSTables", null));
     public final Counter incomingBytes;
     public final Counter outgoingBytes;
+    /* Measures the time taken for processing the incoming stream message after being deserialized, including the time to flush to disk. */
+    public final Timer incomingStreamMessageProcessTime;

Review comment:
       Should we add to metrics documentation?
   https://cassandra.apache.org/doc/latest/operating/metrics.html?highlight=metrics#streaming-metrics

##########
File path: src/java/org/apache/cassandra/streaming/StreamSession.java
##########
@@ -792,7 +792,21 @@ public void receive(IncomingStreamMessage message)
         // send back file received message
         messageSender.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
         StreamHook.instance.reportIncomingStream(message.header.tableId, message.stream, this, message.header.sequenceNumber);
-        receivers.get(message.header.tableId).received(message.stream);
+        long receivedStartNanos = System.nanoTime();
+        try
+        {
+            receivers.get(message.header.tableId).received(message.stream);
+        }
+        finally
+        {
+            long latencyNanos = System.nanoTime() - receivedStartNanos;
+            metrics.incomingStreamMessageProcessTime.update(latencyNanos, TimeUnit.NANOSECONDS);
+            long latencyMs = TimeUnit.NANOSECONDS.toMillis(latencyNanos);
+            int timeout = DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS();
+            if (timeout > 0 && latencyMs > timeout)
+                logger.warn("Time taken ({} ms) for processing the incoming stream message ({}) exceeded internode streaming TCP user timeout ({} ms),",

Review comment:
       Is there anything we might tell the operator here about what to tune, change, or look into system-wise?

##########
File path: src/java/org/apache/cassandra/config/Config.java
##########
@@ -178,6 +178,8 @@
     // (it takes much longer than 30s) as of Linux 4.12. If you want something that high set this to 0
     // (which picks up the OS default) and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
     public int internode_tcp_user_timeout_in_ms = 30000;
+    // Similar to internode_tcp_user_timeout_in_ms but used specifically for streaming connection.
+    public int internode_streaming_tcp_user_timeout_in_ms = 0;

Review comment:
       The [comment](https://github.com/apache/cassandra/blob/22abff779df097e0ef38180442e9c680b3d41187/src/java/org/apache/cassandra/config/Config.java#L43) at the top seems to indicate that JMX-modified settings should be declared volatile. Is that a concern here?

##########
File path: src/java/org/apache/cassandra/config/Config.java
##########
@@ -178,6 +178,8 @@
     // (it takes much longer than 30s) as of Linux 4.12. If you want something that high set this to 0
     // (which picks up the OS default) and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
     public int internode_tcp_user_timeout_in_ms = 30000;
+    // Similar to internode_tcp_user_timeout_in_ms but used specifically for streaming connection.
+    public int internode_streaming_tcp_user_timeout_in_ms = 0;

Review comment:
       Do we need to document these in https://github.com/apache/cassandra-website/blob/trunk/src/doc/4.0-beta3/_sources/configuration/cass_yaml_file.rst.txt ? Worth adding some of your tl;dr from the ticket here, or in docs?

##########
File path: src/java/org/apache/cassandra/config/Config.java
##########
@@ -178,6 +178,8 @@
     // (it takes much longer than 30s) as of Linux 4.12. If you want something that high set this to 0
     // (which picks up the OS default) and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
     public int internode_tcp_user_timeout_in_ms = 30000;
+    // Similar to internode_tcp_user_timeout_in_ms but used specifically for streaming connection.
+    public int internode_streaming_tcp_user_timeout_in_ms = 0;

Review comment:
       What does it mean for this timeout to default to zero instead of 30000 like its counterpart above?
   
   https://man7.org/linux/man-pages/man7/tcp.7.html
   > If the option value is specified as 0, TCP will use the system default.
   
   Seems sane to me, but it there a reason for the default to be different than `internode_tcp_user_timeout_in_ms`? I haven't been able to find out what the "system default" would typically be, and if it's much different from 30000. A brief look at the source gave me the impression that not setting it would actually cause this timeout mechanism to be bypassed completely, which is different behavior than we had before with a non-zero default.

##########
File path: src/java/org/apache/cassandra/metrics/StreamingMetrics.java
##########
@@ -43,6 +44,8 @@
     public static final Counter totalOutgoingRepairSSTables = Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingRepairSSTables", null));
     public final Counter incomingBytes;
     public final Counter outgoingBytes;
+    /* Measures the time taken for processing the incoming stream message after being deserialized, including the time to flush to disk. */
+    public final Timer incomingStreamMessageProcessTime;

Review comment:
       Should we bootstrap a test for these metrics? I know we haven't bothered historically, but we're [trying to change that](https://issues.apache.org/jira/browse/CASSANDRA-16190) for 4.0+.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] bereng commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
bereng commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r533933892



##########
File path: src/java/org/apache/cassandra/metrics/StreamingMetrics.java
##########
@@ -43,6 +44,8 @@
     public static final Counter totalOutgoingRepairSSTables = Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingRepairSSTables", null));
     public final Counter incomingBytes;
     public final Counter outgoingBytes;
+    /* Measures the time taken for processing the incoming stream message after being deserialized, including the time to flush to disk. */
+    public final Timer incomingStreamMessageProcessTime;

Review comment:
       +1 for testing the metrics




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] yifan-c commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
yifan-c commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r535541304



##########
File path: test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
##########
@@ -67,9 +72,37 @@ public void testCaptureConsoleOutput() throws Throwable
     public void testNodetoolSystemExit()
     {
         // Verify currently calls System.exit, this test uses that knowlege to test System.exit behavior in jvm-dtest
-        CLUSTER.get(1).nodetoolResult("verify", "--check-tokens")
-               .asserts()
-               .failure()
-               .stdoutContains("Token verification requires --extended-verify");
+        NODE.nodetoolResult("verify", "--check-tokens")

Review comment:
       I align with David on the usage. 
   The `ToolRunner` is good for non-distributed cases. Meanwhile, the `nodetool` and `NodeToolResult` interfaces exposed from jvm-dtest-api is suitable in the jvm dtest cases. 
   The nodetool api is fluent and easy to use (e.g. assertion). The `NodeToolResult` already contains stdout and stderr from command. Therefore I do not see benefits from wrapping the jvm-dtest-api within ToolRunner interface. We lose the benefits by wrapping the api into `ToolRunner`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
blerer commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r537348468



##########
File path: test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.StreamingMetrics;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class StreamingMetricsTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = Cluster.build().withNodes(2)
+                         .withDataDirCount(1)
+                         .withConfig(config -> config.with(NETWORK)
+                                                     .set("stream_entire_sstables", false))
+                         .start();
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
+    }
+
+    private static InetAddressAndPort getNodeAddress(int num)
+    {
+        InetSocketAddress broadcastAddress = cluster.get(num).broadcastAddress();
+        return InetAddressAndPort.getByAddressOverrideDefaults(broadcastAddress.getAddress(),
+                                                               broadcastAddress.getPort());
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        cluster.close();
+    }
+
+
+    @Test
+    public void testStreamMetrics()
+    {
+        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+        final int rowsPerFile = 500;
+        final int files = 5;
+        for (int k = 0; k < files; k++)
+        {
+            for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+                cluster.get(1).executeInternal(String.format("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');", KEYSPACE), Integer.toString(i));
+            cluster.get(1).nodetool("flush");
+        }
+
+        cluster.get(2).executeInternal("TRUNCATE system.available_ranges;");
+        {
+            Object[][] results = cluster.get(2).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
+            assertThat(results.length).isEqualTo(0);
+        }

Review comment:
       Is there a reason for enclosing that part of code between braces?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] jonmeredith commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
jonmeredith commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r535303747



##########
File path: src/java/org/apache/cassandra/config/Config.java
##########
@@ -178,6 +178,8 @@
     // (it takes much longer than 30s) as of Linux 4.12. If you want something that high set this to 0
     // (which picks up the OS default) and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
     public int internode_tcp_user_timeout_in_ms = 30000;
+    // Similar to internode_tcp_user_timeout_in_ms but used specifically for streaming connection.
+    public int internode_streaming_tcp_user_timeout_in_ms = 0;

Review comment:
       Will take it back to the JIRA ticket and comment there. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] yifan-c commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
yifan-c commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r535714110



##########
File path: src/java/org/apache/cassandra/streaming/StreamSession.java
##########
@@ -792,7 +792,21 @@ public void receive(IncomingStreamMessage message)
         // send back file received message
         messageSender.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
         StreamHook.instance.reportIncomingStream(message.header.tableId, message.stream, this, message.header.sequenceNumber);
-        receivers.get(message.header.tableId).received(message.stream);
+        long receivedStartNanos = System.nanoTime();
+        try
+        {
+            receivers.get(message.header.tableId).received(message.stream);
+        }
+        finally
+        {
+            long latencyNanos = System.nanoTime() - receivedStartNanos;
+            metrics.incomingStreamMessageProcessTime.update(latencyNanos, TimeUnit.NANOSECONDS);
+            long latencyMs = TimeUnit.NANOSECONDS.toMillis(latencyNanos);
+            int timeout = DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS();
+            if (timeout > 0 && latencyMs > timeout)
+                logger.warn("Time taken ({} ms) for processing the incoming stream message ({}) exceeded internode streaming TCP user timeout ({} ms),",

Review comment:
       Maybe suggest to tune (increase) the `internode_streaming_tcp_user_timeout_in_ms`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] yifan-c commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
yifan-c commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r537928060



##########
File path: src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
##########
@@ -410,10 +410,19 @@ public int tcpConnectTimeoutInMS()
                                              : DatabaseDescriptor.getInternodeTcpConnectTimeoutInMS();
     }
 
-    public int tcpUserTimeoutInMS()
+    public int tcpUserTimeoutInMS(ConnectionCategory category)
     {
-        return tcpUserTimeoutInMS != null ? tcpUserTimeoutInMS
-                                          : DatabaseDescriptor.getInternodeTcpUserTimeoutInMS();
+        // Reusing tcpUserTimeoutInMS for both messaging and streaming, since the connection is created for either one of them.
+        if (tcpUserTimeoutInMS != null)
+            return tcpUserTimeoutInMS;
+
+        if (category.isMessaging())

Review comment:
       👍  I like this form since we are doing exhaustive matching here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] aholmberg commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
aholmberg commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r534289322



##########
File path: src/java/org/apache/cassandra/config/Config.java
##########
@@ -178,6 +178,8 @@
     // (it takes much longer than 30s) as of Linux 4.12. If you want something that high set this to 0
     // (which picks up the OS default) and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
     public int internode_tcp_user_timeout_in_ms = 30000;
+    // Similar to internode_tcp_user_timeout_in_ms but used specifically for streaming connection.
+    public int internode_streaming_tcp_user_timeout_in_ms = 0;

Review comment:
       > Finding a proper default value is hard. 
   
   💯  couldn't agree more
   
   > The reason I set it 0 is that it seems to be working better for streaming
   
   Sure. I'm not fully arguing against a different value. Just trying to reason about the change in behavior. It could even be that we want the streaming timeout with some higher value, but not zero, which might defeat the purpose of the original ticket introducing TCP User Timeout (CASSANDRA-14358).
   For the record, [here](https://github.com/torvalds/linux/blob/9ff9b0d392ea08090cd1780fb196f36dbb586529/net/ipv4/tcp_timer.c#L363-L369) is the area of code that made me think it just disables the mechanism altogether. I only point it out to say that I think it's a little different than using some other "system default" as some of the higher level docs say. I'm wondering if the "system default" is to not have this mechanism enabled.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
blerer commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r537348468



##########
File path: test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.StreamingMetrics;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class StreamingMetricsTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = Cluster.build().withNodes(2)
+                         .withDataDirCount(1)
+                         .withConfig(config -> config.with(NETWORK)
+                                                     .set("stream_entire_sstables", false))
+                         .start();
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
+    }
+
+    private static InetAddressAndPort getNodeAddress(int num)
+    {
+        InetSocketAddress broadcastAddress = cluster.get(num).broadcastAddress();
+        return InetAddressAndPort.getByAddressOverrideDefaults(broadcastAddress.getAddress(),
+                                                               broadcastAddress.getPort());
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        cluster.close();
+    }
+
+
+    @Test
+    public void testStreamMetrics()
+    {
+        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+        final int rowsPerFile = 500;
+        final int files = 5;
+        for (int k = 0; k < files; k++)
+        {
+            for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+                cluster.get(1).executeInternal(String.format("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');", KEYSPACE), Integer.toString(i));
+            cluster.get(1).nodetool("flush");
+        }
+
+        cluster.get(2).executeInternal("TRUNCATE system.available_ranges;");
+        {
+            Object[][] results = cluster.get(2).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
+            assertThat(results.length).isEqualTo(0);
+        }

Review comment:
       Is there a reason for enclosing that part of code between braces?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] bereng commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
bereng commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r533930553



##########
File path: src/java/org/apache/cassandra/service/StorageService.java
##########
@@ -1321,6 +1321,19 @@ public int getInternodeTcpUserTimeoutInMS()
         return DatabaseDescriptor.getInternodeTcpUserTimeoutInMS();
     }
 
+    public void setInternodeStreamingTcpUserTimeoutInMS(int value)
+    {
+        if (value < 0)

Review comment:
       Use `Preconditions` for consistency?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
blerer commented on pull request #824:
URL: https://github.com/apache/cassandra/pull/824#issuecomment-741760260


   Thanks to everybody.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] dcapwell commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
dcapwell commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r537805571



##########
File path: doc/source/configuration/cass_yaml_file.rst
##########
@@ -1406,17 +1406,33 @@ Lowest acceptable value is 10 ms.
 Defensive settings for protecting Cassandra from true network partitions.
 See (CASSANDRA-14358) for details.
 
+
+``internode_tcp_connect_timeout_in_ms``
+---------------------------------------
 The amount of time to wait for internode tcp connections to establish.
-internode_tcp_connect_timeout_in_ms = 2000
 
+*Default Value:* 2000
+
+``internode_tcp_user_timeout_in_ms``
+------------------------------------
 The amount of time unacknowledged data is allowed on a connection before we throw out the connection
 Note this is only supported on Linux + epoll, and it appears to behave oddly above a setting of 30000
 (it takes much longer than 30s) as of Linux 4.12. If you want something that high set this to 0
 which picks up the OS default and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
-internode_tcp_user_timeout_in_ms = 30000
 
-The maximum continuous period a connection may be unwritable in application space
-internode_application_timeout_in_ms = 30000
+*Default Value:* 30000
+
+``internode_streaming_tcp_user_timeout_in_ms``
+----------------------------------------------
+The amount of time unacknowledged data is allowed on a streaming connection before we close the connection.
+
+*Default Value:* 3000000

Review comment:
       should be `300000`, there is an extra zero.

##########
File path: src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
##########
@@ -410,10 +410,19 @@ public int tcpConnectTimeoutInMS()
                                              : DatabaseDescriptor.getInternodeTcpConnectTimeoutInMS();
     }
 
-    public int tcpUserTimeoutInMS()
+    public int tcpUserTimeoutInMS(ConnectionCategory category)
     {
-        return tcpUserTimeoutInMS != null ? tcpUserTimeoutInMS
-                                          : DatabaseDescriptor.getInternodeTcpUserTimeoutInMS();
+        // Reusing tcpUserTimeoutInMS for both messaging and streaming, since the connection is created for either one of them.
+        if (tcpUserTimeoutInMS != null)
+            return tcpUserTimeoutInMS;
+
+        if (category.isMessaging())

Review comment:
       personal preference
   
   ```
   switch (category)
   {
     case MESSAGING: return DatabaseDescriptor.getInternodeTcpUserTimeoutInMS();
     case STREAMING: return DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS();
     default: throw new IllegalArgumentException("Unknown connection category: " + category);
   }
   ```

##########
File path: test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.StreamingMetrics;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class StreamingMetricsTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = Cluster.build().withNodes(2)
+                         .withDataDirCount(1)
+                         .withConfig(config -> config.with(NETWORK)
+                                                     .set("stream_entire_sstables", false))
+                         .start();
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
+    }
+
+    private static InetAddressAndPort getNodeAddress(int num)
+    {
+        InetSocketAddress broadcastAddress = cluster.get(num).broadcastAddress();
+        return InetAddressAndPort.getByAddressOverrideDefaults(broadcastAddress.getAddress(),
+                                                               broadcastAddress.getPort());
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        cluster.close();

Review comment:
       should do a null check as its possible setup fails

##########
File path: test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.StreamingMetrics;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class StreamingMetricsTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = Cluster.build().withNodes(2)
+                         .withDataDirCount(1)
+                         .withConfig(config -> config.with(NETWORK)
+                                                     .set("stream_entire_sstables", false))
+                         .start();
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
+    }
+
+    private static InetAddressAndPort getNodeAddress(int num)
+    {
+        InetSocketAddress broadcastAddress = cluster.get(num).broadcastAddress();
+        return InetAddressAndPort.getByAddressOverrideDefaults(broadcastAddress.getAddress(),
+                                                               broadcastAddress.getPort());
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        cluster.close();
+    }
+
+
+    @Test
+    public void testStreamMetrics()
+    {
+        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+        final int rowsPerFile = 500;
+        final int files = 5;
+        for (int k = 0; k < files; k++)
+        {
+            for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+                cluster.get(1).executeInternal(String.format("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');", KEYSPACE), Integer.toString(i));

Review comment:
       fyi there is a method `withKeyspace` in scope which does this for you
   
   ```
   cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"), Integer.toString(i));
   ```

##########
File path: src/java/org/apache/cassandra/streaming/StreamSession.java
##########
@@ -792,7 +792,25 @@ public void receive(IncomingStreamMessage message)
         // send back file received message
         messageSender.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
         StreamHook.instance.reportIncomingStream(message.header.tableId, message.stream, this, message.header.sequenceNumber);
-        receivers.get(message.header.tableId).received(message.stream);
+        long receivedStartNanos = System.nanoTime();
+        try
+        {
+            receivers.get(message.header.tableId).received(message.stream);

Review comment:
       we call `CassandraStreamReceiver::finished` async, so this time does not include the actual commit?  Guess it makes sense given the name is incoming process time; and not processing time.

##########
File path: src/java/org/apache/cassandra/streaming/StreamSession.java
##########
@@ -792,7 +792,25 @@ public void receive(IncomingStreamMessage message)
         // send back file received message
         messageSender.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
         StreamHook.instance.reportIncomingStream(message.header.tableId, message.stream, this, message.header.sequenceNumber);
-        receivers.get(message.header.tableId).received(message.stream);
+        long receivedStartNanos = System.nanoTime();
+        try
+        {
+            receivers.get(message.header.tableId).received(message.stream);
+        }
+        finally
+        {
+            long latencyNanos = System.nanoTime() - receivedStartNanos;
+            metrics.incomingProcessTime.update(latencyNanos, TimeUnit.NANOSECONDS);
+            long latencyMs = TimeUnit.NANOSECONDS.toMillis(latencyNanos);
+            int timeout = DatabaseDescriptor.getInternodeStreamingTcpUserTimeoutInMS();
+            if (timeout > 0 && latencyMs > timeout)
+                logger.warn("The time taken ({} ms) for processing the incoming stream message ({})" +

Review comment:
       I am worried that this will produce a lot of false positives, and can produce a lot of spam if multiple sessions are going on; wondering if we should use a no spam logger here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] yifan-c commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
yifan-c commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r533777976



##########
File path: src/java/org/apache/cassandra/config/Config.java
##########
@@ -178,6 +178,8 @@
     // (it takes much longer than 30s) as of Linux 4.12. If you want something that high set this to 0
     // (which picks up the OS default) and configure the net.ipv4.tcp_retries2 sysctl to be ~8.
     public int internode_tcp_user_timeout_in_ms = 30000;
+    // Similar to internode_tcp_user_timeout_in_ms but used specifically for streaming connection.
+    public int internode_streaming_tcp_user_timeout_in_ms = 0;

Review comment:
       > What does it mean for this timeout to default to zero instead of 30000 like its counterpart above?
   
   Finding a proper default value is hard. The reason I set it 0 is that it seems to be working better for streaming, according to the ticket description. 
   I guess, we can set it the same as the previous value (30000) and adjust it on-demand. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] bereng commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
bereng commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r533935165



##########
File path: test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
##########
@@ -67,9 +72,37 @@ public void testCaptureConsoleOutput() throws Throwable
     public void testNodetoolSystemExit()
     {
         // Verify currently calls System.exit, this test uses that knowlege to test System.exit behavior in jvm-dtest
-        CLUSTER.get(1).nodetoolResult("verify", "--check-tokens")
-               .asserts()
-               .failure()
-               .stdoutContains("Token verification requires --extended-verify");
+        NODE.nodetoolResult("verify", "--check-tokens")

Review comment:
       We have been trying to consolidate tooling testing across the board with a recently introduced `ToolRunner`. I would move these new tests to that new interface for consistency and to benefit from any future dev there. See `NodeToolRingTest` for some example




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
blerer commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r537354436



##########
File path: test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.StreamingMetrics;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class StreamingMetricsTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = Cluster.build().withNodes(2)
+                         .withDataDirCount(1)
+                         .withConfig(config -> config.with(NETWORK)
+                                                     .set("stream_entire_sstables", false))
+                         .start();
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
+    }
+
+    private static InetAddressAndPort getNodeAddress(int num)
+    {
+        InetSocketAddress broadcastAddress = cluster.get(num).broadcastAddress();
+        return InetAddressAndPort.getByAddressOverrideDefaults(broadcastAddress.getAddress(),
+                                                               broadcastAddress.getPort());
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        cluster.close();
+    }
+
+
+    @Test
+    public void testStreamMetrics()
+    {
+        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+        final int rowsPerFile = 500;
+        final int files = 5;
+        for (int k = 0; k < files; k++)
+        {
+            for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+                cluster.get(1).executeInternal(String.format("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');", KEYSPACE), Integer.toString(i));
+            cluster.get(1).nodetool("flush");
+        }
+
+        cluster.get(2).executeInternal("TRUNCATE system.available_ranges;");
+        {
+            Object[][] results = cluster.get(2).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
+            assertThat(results.length).isEqualTo(0);
+        }

Review comment:
       Is there a reason for enclosing that part of code between braces?

##########
File path: test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.StreamingMetrics;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class StreamingMetricsTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = Cluster.build().withNodes(2)
+                         .withDataDirCount(1)
+                         .withConfig(config -> config.with(NETWORK)
+                                                     .set("stream_entire_sstables", false))
+                         .start();
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
+    }
+
+    private static InetAddressAndPort getNodeAddress(int num)
+    {
+        InetSocketAddress broadcastAddress = cluster.get(num).broadcastAddress();
+        return InetAddressAndPort.getByAddressOverrideDefaults(broadcastAddress.getAddress(),
+                                                               broadcastAddress.getPort());
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        cluster.close();
+    }
+
+
+    @Test
+    public void testStreamMetrics()
+    {
+        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+        final int rowsPerFile = 500;
+        final int files = 5;
+        for (int k = 0; k < files; k++)
+        {
+            for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+                cluster.get(1).executeInternal(String.format("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');", KEYSPACE), Integer.toString(i));
+            cluster.get(1).nodetool("flush");
+        }
+
+        cluster.get(2).executeInternal("TRUNCATE system.available_ranges;");
+        {
+            Object[][] results = cluster.get(2).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
+            assertThat(results.length).isEqualTo(0);
+        }
+
+        InetAddressAndPort node1Address = getNodeAddress(1);
+        InetAddressAndPort node2Address = getNodeAddress(2);
+
+        cluster.get(2).nodetool("rebuild", "--keyspace", KEYSPACE);
+
+        // Trigger streaming in node 2 and assert metrics on completion.
+        cluster.get(2).runOnInstance(() -> {
+            StreamingMetrics metrics = StreamingMetrics.get(node1Address);
+            assertThat(metrics.incomingBytes.getCount())
+                .isGreaterThan(0)
+                .describedAs("There should be bytes streamed from the peer.");
+            assertThat(metrics.outgoingBytes.getCount())
+                .isEqualTo(0)
+                .describedAs("There should not be sstables streamed to the peer.");
+            assertThat(metrics.incomingProcessTime.getCount())
+                .isEqualTo(files)
+                .describedAs("There should be " + files + " files streamed from the peer.");
+            assertThat(metrics.incomingProcessTime.getSnapshot().getMedian())
+                .isGreaterThan(0)
+                .describedAs("The median processing time should be non-0");
+        });
+
+        // Assert metrics in node 1
+        cluster.get(1).runOnInstance(() -> {
+            StreamingMetrics metrics = StreamingMetrics.get(node2Address);
+            assertThat(metrics.incomingBytes.getCount())
+                .isEqualTo(0).describedAs("There should not be sstables streamed from the peer.");
+            assertThat(metrics.outgoingBytes.getCount())
+                .isGreaterThan(0)
+                .describedAs("There should be bytes streamed to the peer.");
+            assertThat(metrics.incomingProcessTime.getCount())
+                .isEqualTo(0)
+                .describedAs("There should be no files streamed from the peer.");
+        });

Review comment:
       Could we check that node1  `outgoingBytes` is equals to node2 `incomingBytes` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] dcapwell commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
dcapwell commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r535524693



##########
File path: test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
##########
@@ -67,9 +72,37 @@ public void testCaptureConsoleOutput() throws Throwable
     public void testNodetoolSystemExit()
     {
         // Verify currently calls System.exit, this test uses that knowlege to test System.exit behavior in jvm-dtest
-        CLUSTER.get(1).nodetoolResult("verify", "--check-tokens")
-               .asserts()
-               .failure()
-               .stdoutContains("Token verification requires --extended-verify");
+        NODE.nodetoolResult("verify", "--check-tokens")

Review comment:
       for jvm-dtest I would not agree with this.  `NodeToolResult` is more fleshed out than tool runner offers (as it is deeply integrated with jvm-dtest), and the added ToolRunner wrapper adds more complexity without offering simplification to test authors.
   
   I feel that ToolRunner should be used when calling `main` or calling a CLI on disk, but not when using jvm-dtest nodetool.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] yifan-c commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
yifan-c commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r537696405



##########
File path: test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.StreamingMetrics;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class StreamingMetricsTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = Cluster.build().withNodes(2)
+                         .withDataDirCount(1)
+                         .withConfig(config -> config.with(NETWORK)
+                                                     .set("stream_entire_sstables", false))
+                         .start();
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
+    }
+
+    private static InetAddressAndPort getNodeAddress(int num)
+    {
+        InetSocketAddress broadcastAddress = cluster.get(num).broadcastAddress();
+        return InetAddressAndPort.getByAddressOverrideDefaults(broadcastAddress.getAddress(),
+                                                               broadcastAddress.getPort());
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        cluster.close();
+    }
+
+
+    @Test
+    public void testStreamMetrics()
+    {
+        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+        final int rowsPerFile = 500;
+        final int files = 5;
+        for (int k = 0; k < files; k++)
+        {
+            for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+                cluster.get(1).executeInternal(String.format("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');", KEYSPACE), Integer.toString(i));
+            cluster.get(1).nodetool("flush");
+        }
+
+        cluster.get(2).executeInternal("TRUNCATE system.available_ranges;");
+        {
+            Object[][] results = cluster.get(2).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
+            assertThat(results.length).isEqualTo(0);
+        }

Review comment:
       Just to associate the code block with the prior `TRUNCATE` op visually. I will remove it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] yifan-c commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
yifan-c commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r537700295



##########
File path: test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.StreamingMetrics;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class StreamingMetricsTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = Cluster.build().withNodes(2)
+                         .withDataDirCount(1)
+                         .withConfig(config -> config.with(NETWORK)
+                                                     .set("stream_entire_sstables", false))
+                         .start();
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
+    }
+
+    private static InetAddressAndPort getNodeAddress(int num)
+    {
+        InetSocketAddress broadcastAddress = cluster.get(num).broadcastAddress();
+        return InetAddressAndPort.getByAddressOverrideDefaults(broadcastAddress.getAddress(),
+                                                               broadcastAddress.getPort());
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        cluster.close();
+    }
+
+
+    @Test
+    public void testStreamMetrics()
+    {
+        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+        final int rowsPerFile = 500;
+        final int files = 5;
+        for (int k = 0; k < files; k++)
+        {
+            for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+                cluster.get(1).executeInternal(String.format("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');", KEYSPACE), Integer.toString(i));
+            cluster.get(1).nodetool("flush");
+        }
+
+        cluster.get(2).executeInternal("TRUNCATE system.available_ranges;");
+        {
+            Object[][] results = cluster.get(2).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
+            assertThat(results.length).isEqualTo(0);
+        }
+
+        InetAddressAndPort node1Address = getNodeAddress(1);
+        InetAddressAndPort node2Address = getNodeAddress(2);
+
+        cluster.get(2).nodetool("rebuild", "--keyspace", KEYSPACE);
+
+        // Trigger streaming in node 2 and assert metrics on completion.
+        cluster.get(2).runOnInstance(() -> {
+            StreamingMetrics metrics = StreamingMetrics.get(node1Address);
+            assertThat(metrics.incomingBytes.getCount())
+                .isGreaterThan(0)
+                .describedAs("There should be bytes streamed from the peer.");
+            assertThat(metrics.outgoingBytes.getCount())
+                .isEqualTo(0)
+                .describedAs("There should not be sstables streamed to the peer.");
+            assertThat(metrics.incomingProcessTime.getCount())
+                .isEqualTo(files)
+                .describedAs("There should be " + files + " files streamed from the peer.");
+            assertThat(metrics.incomingProcessTime.getSnapshot().getMedian())
+                .isGreaterThan(0)
+                .describedAs("The median processing time should be non-0");
+        });
+
+        // Assert metrics in node 1
+        cluster.get(1).runOnInstance(() -> {
+            StreamingMetrics metrics = StreamingMetrics.get(node2Address);
+            assertThat(metrics.incomingBytes.getCount())
+                .isEqualTo(0).describedAs("There should not be sstables streamed from the peer.");
+            assertThat(metrics.outgoingBytes.getCount())
+                .isGreaterThan(0)
+                .describedAs("There should be bytes streamed to the peer.");
+            assertThat(metrics.incomingProcessTime.getCount())
+                .isEqualTo(0)
+                .describedAs("There should be no files streamed from the peer.");
+        });

Review comment:
       Good idea! 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] dcapwell commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
dcapwell commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r537883583



##########
File path: test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.StreamingMetrics;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class StreamingMetricsTest extends TestBaseImpl

Review comment:
       ran in a loop, this was stable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] yifan-c commented on a change in pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
yifan-c commented on a change in pull request #824:
URL: https://github.com/apache/cassandra/pull/824#discussion_r537276503



##########
File path: src/java/org/apache/cassandra/metrics/StreamingMetrics.java
##########
@@ -43,6 +44,8 @@
     public static final Counter totalOutgoingRepairSSTables = Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingRepairSSTables", null));
     public final Counter incomingBytes;
     public final Counter outgoingBytes;
+    /* Measures the time taken for processing the incoming stream message after being deserialized, including the time to flush to disk. */
+    public final Timer incomingStreamMessageProcessTime;

Review comment:
       I have added one jvm dtest to test streaming metrics that is related to this patch.  
   The test can be a starting point for [CASSANDRA-16190](https://issues.apache.org/jira/browse/CASSANDRA-16190).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer closed pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
blerer closed pull request #824:
URL: https://github.com/apache/cassandra/pull/824


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] aholmberg commented on pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
aholmberg commented on pull request #824:
URL: https://github.com/apache/cassandra/pull/824#issuecomment-740057448


   Current change set looks good to me, assuming Benjamin's comments are addressed, and the longer timeout is adopted as discussed in the ticket.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] blerer commented on pull request #824: CASSANDRA-16143: Streaming fails when s SSTable writer finish() exceeds internode_tcp_user_timeout

Posted by GitBox <gi...@apache.org>.
blerer commented on pull request #824:
URL: https://github.com/apache/cassandra/pull/824#issuecomment-741760401


   Merged manually.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org