You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2023/01/04 14:51:45 UTC

[GitHub] [cassandra] smiklosovic commented on a diff in pull request #2058: [CASSANDRA-16325] Update streaming metrics incrementally

smiklosovic commented on code in PR #2058:
URL: https://github.com/apache/cassandra/pull/2058#discussion_r1061550568


##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -1027,9 +1022,30 @@ public void receive(IncomingStreamMessage message)
     public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total)
     {
         ProgressInfo progress = new ProgressInfo(peer, index, filename, direction, bytes, total);
+        updateMetricsOnProgress(progress);
         streamResult.handleProgress(progress);
     }
 
+    private void updateMetricsOnProgress(ProgressInfo progress)
+    {
+        ProgressInfo.Direction direction = progress.direction;
+        long lastSeenBytesStreamedForProgress = lastSeenBytesStreamed.getOrDefault(progress, 0L);
+        long newBytesStreamed = progress.currentBytes - lastSeenBytesStreamedForProgress;
+        if (direction == ProgressInfo.Direction.OUT)
+        {
+            StreamingMetrics.totalOutgoingBytes.inc(newBytesStreamed);
+            metrics.outgoingBytes.inc(newBytesStreamed);
+        }
+

Review Comment:
   nit: empty line redundant



##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -1027,9 +1022,30 @@ public void receive(IncomingStreamMessage message)
     public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total)
     {
         ProgressInfo progress = new ProgressInfo(peer, index, filename, direction, bytes, total);
+        updateMetricsOnProgress(progress);
         streamResult.handleProgress(progress);
     }
 
+    private void updateMetricsOnProgress(ProgressInfo progress)
+    {
+        ProgressInfo.Direction direction = progress.direction;
+        long lastSeenBytesStreamedForProgress = lastSeenBytesStreamed.getOrDefault(progress, 0L);

Review Comment:
   does this mean that `lastSeenBytesStreamed` map will be ever growing only as I dont see any place where we are removing it from that map?



##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,10 +155,159 @@ public void testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
         testMetricsWithStreamingToTwoNodes(true);
     }
 
-    private int getNumberOfSSTables(Cluster cluster, int node) {
+    @Test
+    public void testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)

Review Comment:
   nit: `try (Cluster` - there should be space.



##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,10 +155,159 @@ public void testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
         testMetricsWithStreamingToTwoNodes(true);
     }
 
-    private int getNumberOfSSTables(Cluster cluster, int node) {
+    @Test
+    public void testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withDataDirCount(1)
+                                          .withConfig(config -> config.with(NETWORK, GOSSIP)
+                                                                      .set("stream_entire_sstables", false)
+                                                                      .set("hinted_handoff_enabled", false))
+                                          .start(), 2))
+        {
+            runStreamingOperationAndCheckIncrementalMetrics(cluster, () -> cluster.get(2).nodetool("repair", "--full"));
+        }
+    }
+
+    @Test
+    public void testMetricsUpdateIncrementallyWithRebuildAndStreamingBetweenNodes() throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withDataDirCount(1)
+                                          .withConfig(config -> config.with(NETWORK, GOSSIP)
+                                                                      .set("stream_entire_sstables", false)
+                                                                      .set("hinted_handoff_enabled", false))
+                                          .start(), 2))
+        {
+            runStreamingOperationAndCheckIncrementalMetrics(cluster, () -> cluster.get(2).nodetool("rebuild"));
+        }
+    }
+
+    /**
+     * Test to verify that streaming metrics are updated incrementally
+     * - Create 2 node cluster with RF=2
+     * - Create 1 sstable with 10MB on node1, while node2 is empty due to message drop
+     * - Run repair OR rebuild on node2 to transfer sstable from node1
+     * - Collect metrics during streaming and check that at least 3 different values are reported [0, partial1, .., final_size]
+     * - Check final transferred size is correct (~10MB bytes)
+     */
+    public void runStreamingOperationAndCheckIncrementalMetrics(Cluster cluster, Callable<Integer> streamingOperation) throws Exception
+    {
+        assertThat(cluster.size())
+            .describedAs("The minimum cluster size to check streaming metrics is 2 nodes.")
+            .isEqualTo(2);
+
+        // Create table with compression disabled so we can easily compute the expected final sstable size
+        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text PRIMARY KEY, c1 text) " +
+                                           "WITH compaction = {'class': '%s', 'enabled': 'false'} " +
+                                           "AND compression = {'enabled':'false'};",
+                                           KEYSPACE, "SizeTieredCompactionStrategy"));
+
+        // each row has 1KB payload
+        Random random = new Random(0);
+        StringBuilder random1kbString = new StringBuilder();
+        for (int i = 0; i < 1024; i++)
+            random1kbString.append((char)random.nextInt(127));
+
+        // Drop all messages from node1 to node2 so node2 will be empty
+        IMessageFilters.Filter drop1to2 = cluster.filters().verbs(MUTATION_REQ.id).from(1).to(2).drop();
+
+        final int totalRows = 10000; // total size: 10K x 1KB ~= 10MB
+        for (int i = 0; i < totalRows; ++i)
+        {
+            // write rows with timestamp 1 to have deterministic transfer size
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, c1) VALUES (?, ?) USING TIMESTAMP 1;"),
+                                           ConsistencyLevel.ONE,
+                                           Integer.toString(i),
+                                           random1kbString.toString());
+        }
+
+        // Flush and compact all nodes to generate a single sstable
+        cluster.forEach(i -> {
+            i.flush(KEYSPACE);
+            i.forceCompact(KEYSPACE, "cf");
+        });
+
+        // Check that node 1 only has 1 sstable after flush + compaction
+        assertThat(getNumberOfSSTables(cluster, 1)).isEqualTo(1);
+        // Node 2 should have 0 sstables since messages from node1 were dropped
+        assertThat(getNumberOfSSTables(cluster, 2)).isEqualTo(0);
+
+        // Disable dropping of messages from node1 to node2
+        drop1to2.off();
+
+        ExecutorService nodetoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

Review Comment:
   should not we shutdown this executor after we are done?



##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,10 +155,159 @@ public void testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
         testMetricsWithStreamingToTwoNodes(true);
     }
 
-    private int getNumberOfSSTables(Cluster cluster, int node) {
+    @Test
+    public void testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withDataDirCount(1)
+                                          .withConfig(config -> config.with(NETWORK, GOSSIP)
+                                                                      .set("stream_entire_sstables", false)
+                                                                      .set("hinted_handoff_enabled", false))
+                                          .start(), 2))
+        {
+            runStreamingOperationAndCheckIncrementalMetrics(cluster, () -> cluster.get(2).nodetool("repair", "--full"));
+        }
+    }
+
+    @Test
+    public void testMetricsUpdateIncrementallyWithRebuildAndStreamingBetweenNodes() throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withDataDirCount(1)
+                                          .withConfig(config -> config.with(NETWORK, GOSSIP)
+                                                                      .set("stream_entire_sstables", false)
+                                                                      .set("hinted_handoff_enabled", false))
+                                          .start(), 2))
+        {
+            runStreamingOperationAndCheckIncrementalMetrics(cluster, () -> cluster.get(2).nodetool("rebuild"));
+        }
+    }
+
+    /**
+     * Test to verify that streaming metrics are updated incrementally
+     * - Create 2 node cluster with RF=2
+     * - Create 1 sstable with 10MB on node1, while node2 is empty due to message drop
+     * - Run repair OR rebuild on node2 to transfer sstable from node1
+     * - Collect metrics during streaming and check that at least 3 different values are reported [0, partial1, .., final_size]
+     * - Check final transferred size is correct (~10MB bytes)
+     */
+    public void runStreamingOperationAndCheckIncrementalMetrics(Cluster cluster, Callable<Integer> streamingOperation) throws Exception
+    {
+        assertThat(cluster.size())
+            .describedAs("The minimum cluster size to check streaming metrics is 2 nodes.")
+            .isEqualTo(2);
+
+        // Create table with compression disabled so we can easily compute the expected final sstable size
+        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text PRIMARY KEY, c1 text) " +
+                                           "WITH compaction = {'class': '%s', 'enabled': 'false'} " +
+                                           "AND compression = {'enabled':'false'};",
+                                           KEYSPACE, "SizeTieredCompactionStrategy"));

Review Comment:
   I fail to see why there is a placeholder for `class` when it is a constant. Why is it extracted?



##########
test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java:
##########
@@ -145,10 +155,159 @@ public void testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
         testMetricsWithStreamingToTwoNodes(true);
     }
 
-    private int getNumberOfSSTables(Cluster cluster, int node) {
+    @Test
+    public void testMetricsUpdateIncrementallyWithRepairAndStreamingBetweenNodes() throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withDataDirCount(1)
+                                          .withConfig(config -> config.with(NETWORK, GOSSIP)
+                                                                      .set("stream_entire_sstables", false)
+                                                                      .set("hinted_handoff_enabled", false))
+                                          .start(), 2))
+        {
+            runStreamingOperationAndCheckIncrementalMetrics(cluster, () -> cluster.get(2).nodetool("repair", "--full"));
+        }
+    }
+
+    @Test
+    public void testMetricsUpdateIncrementallyWithRebuildAndStreamingBetweenNodes() throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)

Review Comment:
   nit: `try (Cluster` - there should be space.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org