You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2023/01/07 23:27:31 UTC

[GitHub] [cassandra] aratno commented on a diff in pull request #2066: Cassandra 18110

aratno commented on code in PR #2066:
URL: https://github.com/apache/cassandra/pull/2066#discussion_r1064057835


##########
test/distributed/org/apache/cassandra/distributed/test/streaming/StreamingStatsDisabledTest.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.streaming.StreamManager;
+
+public class StreamingStatsDisabledTest extends TestBaseImpl
+{
+    @Test
+    public void test() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> c.with(Feature.values()).set("streaming_stats_enabled", false))
+                                           .start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id varchar, PRIMARY KEY (user_id));"));
+            cluster.stream().forEach(i -> i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success());
+
+            long expectedFiles = 10;
+            for (int i = 0; i < expectedFiles; i++)
+            {
+                cluster.get(1).executeInternal(withKeyspace("insert into %s.users(user_id) values (?)"), "dcapwell" + i);
+                cluster.get(1).flush(KEYSPACE);
+            }
+
+            cluster.get(2).nodetoolResult("rebuild", "--keyspace", KEYSPACE).asserts().success();
+            for (int nodeId : Arrays.asList(1, 2))
+                QueryResultUtil.assertThat(cluster.get(nodeId).executeInternalWithResult("SELECT * FROM system_views.streaming")).isEmpty();
+
+            // trigger streaming again
+            cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.users(user_id) VALUES ('trigger streaming')"));
+            // mimic JMX
+            cluster.get(2).runOnInstance(() -> StreamManager.instance.setStreamingStatsEnabled(true));
+            cluster.get(2).nodetoolResult("repair", KEYSPACE).asserts().success();
+
+            QueryResultUtil.assertThat(cluster.get(1).executeInternalWithResult("SELECT * FROM system_views.streaming")).isEmpty();
+            QueryResultUtil.assertThat(cluster.get(2).executeInternalWithResult("SELECT * FROM system_views.streaming")).hasSize(1);

Review Comment:
   Will this flake if streaming finishes too quickly?



##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -54,18 +57,14 @@ public class StreamingState implements StreamEventHandler
 
     private final long createdAtMillis = Clock.Global.currentTimeMillis();
 
-    // while streaming is running, this is a cache of StreamInfo seen with progress state
-    // the reason for the cache is that StreamSession drops data after tasks (recieve/send) complete, this makes
-    // it so that current state of a future tracks work pending rather than work done, cache solves this by not deleting
-    // when tasks complete
-    // To lower memory costs, clear this after the stream completes
-    private ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = new ConcurrentHashMap<>();
-
     private final TimeUUID id;
     private final boolean follower;
     private final StreamOperation operation;
-    private Set<InetSocketAddress> peers = null;
-    private Sessions sessions = Sessions.EMPTY;
+    private final Set<InetSocketAddress> peers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    @GuardedBy("this")
+    private final ObjectLongMap<Pair<InetAddressAndPort, String>> activeFiles = new ObjectLongHashMap<>();
+    @GuardedBy("this")
+    private final Sessions sessions = new Sessions();

Review Comment:
   Thinking out loud: this shouldn't cause deadlock since "this" / synchronized will use the same reentrant lock



##########
src/java/org/apache/cassandra/config/Config.java:
##########
@@ -875,6 +875,8 @@ public static void setClientMode(boolean clientMode)
     public volatile DurationSpec.LongNanosecondsBound streaming_state_expires = new DurationSpec.LongNanosecondsBound("3d");
     public volatile DataStorageSpec.LongBytesBound streaming_state_size = new DataStorageSpec.LongBytesBound("40MiB");
 
+    public volatile boolean streaming_stats_enabled = true;

Review Comment:
   Can you add a comment about why someone might want to disable this, the potential conflict with TCP_USER_TIMEOUT (via internode_streaming_tcp_user_timeout), and the consequences of disabling it?



##########
test/distributed/org/apache/cassandra/distributed/test/streaming/StreamingStatsDisabledTest.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.streaming.StreamManager;
+
+public class StreamingStatsDisabledTest extends TestBaseImpl
+{
+    @Test
+    public void test() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> c.with(Feature.values()).set("streaming_stats_enabled", false))
+                                           .start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id varchar, PRIMARY KEY (user_id));"));
+            cluster.stream().forEach(i -> i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success());
+
+            long expectedFiles = 10;
+            for (int i = 0; i < expectedFiles; i++)
+            {
+                cluster.get(1).executeInternal(withKeyspace("insert into %s.users(user_id) values (?)"), "dcapwell" + i);
+                cluster.get(1).flush(KEYSPACE);
+            }
+
+            cluster.get(2).nodetoolResult("rebuild", "--keyspace", KEYSPACE).asserts().success();
+            for (int nodeId : Arrays.asList(1, 2))
+                QueryResultUtil.assertThat(cluster.get(nodeId).executeInternalWithResult("SELECT * FROM system_views.streaming")).isEmpty();
+
+            // trigger streaming again
+            cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.users(user_id) VALUES ('trigger streaming')"));
+            // mimic JMX
+            cluster.get(2).runOnInstance(() -> StreamManager.instance.setStreamingStatsEnabled(true));
+            cluster.get(2).nodetoolResult("repair", KEYSPACE).asserts().success();

Review Comment:
   Why rebuild to trigger streaming the first time but repair the second time?



##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -158,6 +158,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
 
+    public enum PrepareType { SEND, ACK }

Review Comment:
   nit: calling this type feels ambiguous to me - maybe PrepareDirection?



##########
src/java/org/apache/cassandra/streaming/StreamManagerMBean.java:
##########
@@ -29,4 +29,14 @@ public interface StreamManagerMBean extends NotificationEmitter
      * Returns the current state of all ongoing streams.
      */
     Set<CompositeData> getCurrentStreams();
+
+    /**
+     * @return weather or not the streaming virtual table should collect stats while streaming is running

Review Comment:
   nit: whether



##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -357,41 +333,9 @@ public static String columns()
                    "  files_sent bigint, \n";
         }
 
-        public static Sessions create(Collection<SessionInfo> sessions)
-        {
-            long bytesToReceive = 0;
-            long bytesReceived = 0;
-            long filesToReceive = 0;
-            long filesReceived = 0;
-            long bytesToSend = 0;
-            long bytesSent = 0;
-            long filesToSend = 0;
-            long filesSent = 0;
-            for (SessionInfo session : sessions)
-            {
-                bytesToReceive += session.getTotalSizeToReceive();
-                bytesReceived += session.getTotalSizeReceived();
-
-                filesToReceive += session.getTotalFilesToReceive();
-                filesReceived += session.getTotalFilesReceived();
-
-                bytesToSend += session.getTotalSizeToSend();
-                bytesSent += session.getTotalSizeSent();
-
-                filesToSend += session.getTotalFilesToSend();
-                filesSent += session.getTotalFilesSent();
-            }
-            if (0 == bytesToReceive && 0 == bytesReceived && 0 == filesToReceive && 0 == filesReceived && 0 == bytesToSend && 0 == bytesSent && 0 == filesToSend && 0 == filesSent)
-                return EMPTY;
-            return new Sessions(bytesToReceive, bytesReceived,
-                                bytesToSend, bytesSent,
-                                filesToReceive, filesReceived,
-                                filesToSend, filesSent);
-        }
-
         public boolean isEmpty()
         {
-            return this == EMPTY;
+            return bytesToReceive == 0 && bytesToSend == 0 && filesToReceive == 0 && filesToSend == 0;

Review Comment:
   This would consider a Sessions empty if it was not empty but complete, since it doesn't consider already {sent,received} {files,bytes}



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org