You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2021/04/09 14:25:49 UTC

[cassandra] branch trunk updated: Fix Streaming Repair metrics

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c4e4e93  Fix Streaming Repair metrics
c4e4e93 is described below

commit c4e4e9388b229a0db92c16029fd647909cec5737
Author: Benjamin Lerer <b....@gmail.com>
AuthorDate: Wed Mar 24 17:23:51 2021 +0100

    Fix Streaming Repair metrics
    
    patch by Benjamin Lerer; reviewed by Ekaterina Dimitrova for
    CASSANDRA-16190
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/streaming/StreamSession.java  |  17 +-
 .../test/metrics/StreamingMetricsTest.java         | 402 +++++++++++++++++----
 3 files changed, 349 insertions(+), 71 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 99c8c97..590a4a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-rc1
+ * Fix Streaming Repair metrics (CASSANDRA-16190)
  * Scheduled (delayed) schema pull tasks should not run after MIGRATION stage shutdown during decommission (CASSANDRA-16495)
  * When behind a firewall trunk is not buildable, need to allow overriding URLs (CASSANDRA-16563)
  * Make sure sstables with moved starts are removed correctly in LeveledGenerations (CASSANDRA-16552)
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index a3ffef1..3a32834 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -615,21 +615,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         state(State.PREPARING);
         PrepareSynMessage prepare = new PrepareSynMessage();
         prepare.requests.addAll(requests);
-        long totalBytesToStream = 0;
-        long totalSSTablesStreamed = 0;
         for (StreamTransferTask task : transfers.values())
         {
-            totalBytesToStream += task.getTotalSize();
-            totalSSTablesStreamed += task.getTotalNumberOfFiles();
             prepare.summaries.add(task.getSummary());
         }
 
-        if(StreamOperation.REPAIR == getStreamOperation())
-        {
-            StreamingMetrics.totalOutgoingRepairBytes.inc(totalBytesToStream);
-            StreamingMetrics.totalOutgoingRepairSSTables.inc(totalSSTablesStreamed);
-        }
-
         messageSender.sendMessage(prepare);
     }
 
@@ -767,6 +757,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         long headerSize = message.stream.getEstimatedSize();
         StreamingMetrics.totalOutgoingBytes.inc(headerSize);
         metrics.outgoingBytes.inc(headerSize);
+
+        if(StreamOperation.REPAIR == getStreamOperation())
+        {
+            StreamingMetrics.totalOutgoingRepairBytes.inc(headerSize);
+            StreamingMetrics.totalOutgoingRepairSSTables.inc(message.stream.getNumFiles());
+        }
+
         // schedule timeout for receiving ACK
         StreamTransferTask task = transfers.get(message.header.tableId);
         if (task != null)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java b/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
index 22449e8..a4e4adc 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
@@ -18,105 +18,385 @@
 
 package org.apache.cassandra.distributed.test.metrics;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.StreamingMetrics;
 
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 
 public class StreamingMetricsTest extends TestBaseImpl
 {
-    private static Cluster cluster;
 
-    @BeforeClass
-    public static void setupCluster() throws IOException
-    {
-        cluster = Cluster.build().withNodes(2)
-                         .withDataDirCount(1)
-                         .withConfig(config -> config.with(NETWORK)
-                                                     .set("stream_entire_sstables", false))
-                         .start();
-        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
-    }
-
-    private static InetAddressAndPort getNodeAddress(int num)
+    private static InetAddressAndPort getNodeAddress(Cluster cluster, int num)
     {
         InetSocketAddress broadcastAddress = cluster.get(num).broadcastAddress();
         return InetAddressAndPort.getByAddressOverrideDefaults(broadcastAddress.getAddress(),
                                                                broadcastAddress.getPort());
     }
 
-    @AfterClass
-    public static void teardownCluster()
+    @Test
+    public void testMetricsWithRepairAndStreamingFromTwoNodes() throws Exception
     {
-        if (cluster != null)
-            cluster.close();
+        testMetricsWithStreamingFromTwoNodes(true);
     }
 
+    @Test
+    public void testMetricsWithRebuildAndStreamingFromTwoNodes() throws Exception
+    {
+        testMetricsWithStreamingFromTwoNodes(false);
+    }
+
+    public void testMetricsWithStreamingFromTwoNodes(boolean useRepair) throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(3)
+                                          .withDataDirCount(1)
+                                          .withConfig(config -> config.with(NETWORK)
+                                                                      .set("stream_entire_sstables", false)
+                                                                      .set("hinted_handoff_enabled", false))
+                                          .start(), 2))
+        {
+            cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+            cluster.get(3).shutdown().get(10, TimeUnit.SECONDS);
+
+            final int rowsPerFile = 500;
+            final int files = 5;
+            for (int k = 0; k < files; k++)
+            {
+                for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+                {
+                    cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"),
+                                                   ConsistencyLevel.ONE,
+                                                   Integer.toString(i));
+                }
+                cluster.get(1).nodetool("flush");
+                cluster.get(2).nodetool("flush");
+            }
+
+            cluster.get(3).startup();
+
+            // Checks that the table is empty on node 3
+            Object[][] results = cluster.get(3).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
+            assertThat(results.length).isEqualTo(0);
+
+            checkThatNoStreamingOccuredBetweenTheThreeNodes(cluster);
+
+            // Trigger streaming from node 3
+            if (useRepair)
+                cluster.get(3).nodetool("repair", "--full");
+            else
+                cluster.get(3).nodetool("rebuild", "--keyspace", KEYSPACE);
+
+
+            // Check streaming metrics on node 1
+            checkThatNoStreamingOccured(cluster, 1, 2);
+            long bytesFrom1 = checkDataSent(cluster, 1, 3);
+            checkDataReceived(cluster, 1, 3, 0, 0);
+
+            if (useRepair)
+                checkTotalDataSent(cluster, 1, bytesFrom1, bytesFrom1, files);
+            else
+                checkTotalDataSent(cluster, 1, bytesFrom1, 0, 0);
+
+            checkTotalDataReceived(cluster, 1, 0);
+
+            // Check streaming metrics on node 2
+            checkThatNoStreamingOccured(cluster, 2, 1);
+            long bytesFrom2 = checkDataSent(cluster, 2, 3);
+            checkDataReceived(cluster, 1, 2, 0, 0);
+
+            if (useRepair)
+                checkTotalDataSent(cluster, 2, bytesFrom2, bytesFrom2, files);
+            else
+                checkTotalDataSent(cluster, 2, bytesFrom2, 0, 0);
+
+            checkTotalDataReceived(cluster, 2, 0);
+
+            // Check streaming metrics on node 3
+            checkDataReceived(cluster, 3, 1, bytesFrom1, files);
+            checkDataReceived(cluster, 3, 2, bytesFrom2, files);
+            checkTotalDataSent(cluster, 3, 0, 0, 0);
+            checkTotalDataReceived(cluster, 3, bytesFrom1 + bytesFrom2);
+        }
+    }
 
     @Test
-    public void testStreamMetrics()
+    public void testMetricsWithRebuildAndStreamingToTwoNodes() throws Exception
     {
-        cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+        testMetricsWithStreamingToTwoNodes(false);
+    }
+
+    @Test
+    public void testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
+    {
+        testMetricsWithStreamingToTwoNodes(true);
+    }
 
-        final int rowsPerFile = 500;
-        final int files = 5;
-        for (int k = 0; k < files; k++)
+    public void testMetricsWithStreamingToTwoNodes(boolean useRepair) throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(3)
+                                          .withDataDirCount(1)
+                                          .withConfig(config -> config.with(NETWORK)
+                                                                      .set("stream_entire_sstables", false)
+                                                                      .set("hinted_handoff_enabled", false))
+                                          .start(), 2))
         {
-            for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
-                cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"), Integer.toString(i));
-            cluster.get(1).nodetool("flush");
+            cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+            final int rowsPerFile = 500;
+            final int files = 5;
+
+            cluster.get(3).shutdown().get(10, TimeUnit.SECONDS);
+            cluster.get(1).nodetool("disableautocompaction", KEYSPACE);
+            cluster.get(2).nodetool("disableautocompaction", KEYSPACE);
+
+            int sstablesInitiallyOnNode2 = 0;
+            int sstablesInitiallyOnNode3 = 0;
+
+            for (int k = 0; k < 3; k++)
+            {
+                for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+                {
+                    cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"),
+                                                   ConsistencyLevel.ONE,
+                                                   Integer.toString(i));
+                }
+                cluster.get(1).nodetool("flush");
+                cluster.get(2).nodetool("flush");
+                sstablesInitiallyOnNode2++;
+            }
+
+            cluster.get(3).startup();
+            cluster.get(3).nodetool("disableautocompaction", KEYSPACE);
+
+            cluster.get(2).shutdown().get(10, TimeUnit.SECONDS);
+
+            for (int k = 3; k < files; k++)
+            {
+                for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+                {
+                    cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"),
+                                                   ConsistencyLevel.ONE,
+                                                   Integer.toString(i));
+                }
+                cluster.get(1).nodetool("flush");
+                cluster.get(3).nodetool("flush");
+                sstablesInitiallyOnNode3++;
+            }
+
+            cluster.get(2).startup();
+            cluster.get(2).nodetool("disableautocompaction", KEYSPACE);
+
+            checkThatNoStreamingOccuredBetweenTheThreeNodes(cluster);
+
+            // Trigger streaming from node 3 and node 2
+
+            long bytesFrom2To1;
+            int sstablesFrom2To1;
+
+            long bytesFrom3To1;
+            int sstablesFrom3To1;
+
+            int sstablesFrom3To2;
+            int sstablesFrom2To3;
+
+            if (useRepair)
+            {
+                cluster.get(3).nodetool("repair", "--full");
+                cluster.get(2).nodetool("repair", "--full");
+
+                bytesFrom2To1 = checkDataSent(cluster, 2, 1);
+                sstablesFrom2To1 = sstablesInitiallyOnNode2;
+
+                bytesFrom3To1 = checkDataSent(cluster, 3, 1) ;
+                sstablesFrom3To1 = sstablesInitiallyOnNode3;
+
+                sstablesFrom2To3 = sstablesInitiallyOnNode2;
+                sstablesFrom3To2 = sstablesInitiallyOnNode3;
+            }
+            else
+            {
+                cluster.get(3).nodetool("rebuild", "--keyspace", KEYSPACE);
+                cluster.get(2).nodetool("rebuild", "--keyspace", KEYSPACE);
+
+                bytesFrom2To1 = 0;
+                sstablesFrom2To1 = 0;
+
+                bytesFrom3To1 = 0;
+                sstablesFrom3To1 = 0;
+
+                sstablesFrom2To3 = sstablesInitiallyOnNode2;
+                sstablesFrom3To2 = sstablesInitiallyOnNode3 + sstablesInitiallyOnNode2;
+            }
+
+            // Check streaming metrics on node 1
+            long bytesFrom1To2 = checkDataSent(cluster, 1, 2);
+            long bytesFrom1To3 = checkDataSent(cluster, 1, 3);
+
+            long totalBytesSentFrom1 = bytesFrom1To2 + bytesFrom1To3;
+
+            if (useRepair)
+                checkTotalDataSent(cluster, 1, totalBytesSentFrom1, totalBytesSentFrom1, 10);
+            else
+                checkTotalDataSent(cluster, 1, totalBytesSentFrom1, 0, 0);
+
+            checkDataReceived(cluster, 1, 2, bytesFrom2To1, sstablesFrom2To1);
+            checkDataReceived(cluster, 1, 3, bytesFrom3To1, sstablesFrom3To1);
+            checkTotalDataReceived(cluster, 1, bytesFrom2To1 + bytesFrom3To1);
+
+            // Check streaming metrics on node 2 and 3
+            long bytesFrom2To3 = checkDataSent(cluster, 2, 3);
+            long bytesFrom3To2 = checkDataSent(cluster, 3, 2);
+
+            long totalBytesReceivedBy2 = bytesFrom1To2 + bytesFrom3To2;
+
+            checkDataReceived(cluster, 2, 1, bytesFrom1To2, files);
+            checkDataReceived(cluster, 2, 3, bytesFrom3To2, sstablesFrom3To2);
+
+            if (useRepair)
+                checkTotalDataSent(cluster, 2, bytesFrom2To3 + bytesFrom2To1, bytesFrom2To3 + bytesFrom2To1, sstablesFrom2To3 + sstablesFrom2To1);
+            else
+                checkTotalDataSent(cluster, 2, bytesFrom2To3, 0, 0);
+
+            checkTotalDataReceived(cluster, 2, totalBytesReceivedBy2);
+
+            long totalBytesReceivedBy3 = bytesFrom1To3 + bytesFrom2To3;
+
+            checkDataReceived(cluster, 3, 1, bytesFrom1To3, files);
+            checkDataReceived(cluster, 3, 2, bytesFrom2To3, sstablesFrom2To3);
+
+            if (useRepair)
+                checkTotalDataSent(cluster, 3, bytesFrom3To2 + bytesFrom3To1, bytesFrom3To2 + bytesFrom3To1, sstablesFrom3To2 + sstablesFrom3To1);
+            else
+                checkTotalDataSent(cluster, 3, bytesFrom3To2, 0, 0);
+
+            checkTotalDataReceived(cluster, 3, totalBytesReceivedBy3);
         }
+    }
 
-        cluster.get(2).executeInternal("TRUNCATE system.available_ranges;");
-        Object[][] results = cluster.get(2).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
-        assertThat(results.length).isEqualTo(0);
+    private void checkThatNoStreamingOccuredBetweenTheThreeNodes(Cluster cluster)
+    {
+        checkThatNoStreamingOccured(cluster, 1, 2);
+        checkThatNoStreamingOccured(cluster, 1, 3);
+        checkTotalDataSent(cluster, 1, 0, 0, 0);
+        checkTotalDataReceived(cluster, 1, 0);
 
-        InetAddressAndPort node1Address = getNodeAddress(1);
-        InetAddressAndPort node2Address = getNodeAddress(2);
+        checkThatNoStreamingOccured(cluster, 2, 1);
+        checkThatNoStreamingOccured(cluster, 2, 3);
+        checkTotalDataSent(cluster, 2, 0, 0, 0);
+        checkTotalDataReceived(cluster, 2, 0);
 
-        // Trigger streaming from node 2
-        cluster.get(2).nodetool("rebuild", "--keyspace", KEYSPACE);
+        checkThatNoStreamingOccured(cluster, 3, 1);
+        checkThatNoStreamingOccured(cluster, 3, 2);
+        checkTotalDataSent(cluster, 3, 0, 0, 0);
+        checkTotalDataReceived(cluster, 3, 0);
+    }
+
+    private void checkThatNoStreamingOccured(Cluster cluster, int node, int peer)
+    {
+        InetAddressAndPort address = getNodeAddress(cluster, peer);
+        cluster.get(node).runOnInstance(() -> {
+
+            StreamingMetrics metrics = StreamingMetrics.get(address);
 
-        // Assert metrics in node 2
-        long transmittedBytes = cluster.get(2).callOnInstance(() -> {
-            StreamingMetrics metrics = StreamingMetrics.get(node1Address);
             assertThat(metrics.incomingBytes.getCount())
-                .isGreaterThan(0)
-                .describedAs("There should be bytes streamed from the peer.");
+                .describedAs("No SSTable should have been streamed so far from node" + node + " to node" + peer)
+                .isEqualTo(0);
+
             assertThat(metrics.outgoingBytes.getCount())
-                .isEqualTo(0)
-                .describedAs("There should not be sstables streamed to the peer.");
+                .describedAs("No SSTable should have been streamed so far from node" + node + " to node" + peer)
+                .isEqualTo(0);
+
             assertThat(metrics.incomingProcessTime.getCount())
-                .isEqualTo(files)
-                .describedAs("There should be " + files + " files streamed from the peer.");
-            assertThat(metrics.incomingProcessTime.getSnapshot().getMedian())
-                .isGreaterThan(0)
-                .describedAs("The median processing time should be non-0");
-            return metrics.incomingBytes.getCount();
+                .describedAs("No SSTable should have been streamed so far from node" + node + " to node" + peer)
+                .isEqualTo(0);
         });
+    }
 
-        // Assert metrics in node 1
-        cluster.get(1).runOnInstance(() -> {
-            StreamingMetrics metrics = StreamingMetrics.get(node2Address);
-            assertThat(metrics.incomingBytes.getCount())
-                .isEqualTo(0).describedAs("There should not be sstables streamed from the peer.");
-            assertThat(metrics.outgoingBytes.getCount())
-                .isEqualTo(transmittedBytes)
-                .describedAs("The outgoingBytes count in node1 should be equals to incomingBytes count in node2");
-            assertThat(metrics.incomingProcessTime.getCount())
-                .isEqualTo(0)
-                .describedAs("There should be no files streamed from the peer.");
+    private long checkDataSent(Cluster cluster, int node, int peer)
+    {
+        InetAddressAndPort address = getNodeAddress(cluster, peer);
+        return cluster.get(node).callOnInstance(() -> {
+
+            StreamingMetrics metrics = StreamingMetrics.get(address);
+
+            long outgoingBytes = metrics.outgoingBytes.getCount();
+            assertThat(outgoingBytes)
+                .describedAs("There should be data streamed from node" + node + " to node" + peer)
+                .isGreaterThan(0);
+
+            return outgoingBytes;
         });
     }
+
+    private void checkDataReceived(Cluster cluster, int node, int peer, long receivedBytes, int files)
+    {
+        InetAddressAndPort address = getNodeAddress(cluster, peer);
+        cluster.get(node).runOnInstance(() -> {
+
+            StreamingMetrics metrics = StreamingMetrics.get(address);
+
+            long actual = metrics.incomingBytes.getCount();
+            assertThat(actual)
+                .describedAs("The amount of data received by node" + node + " from node" + peer + " is not the expected one. [expected: " + receivedBytes + ", actual: " + actual + "]")
+                .isEqualTo(receivedBytes);
+
+            actual = metrics.incomingProcessTime.getCount();
+            // The incomingProcessTime timer is updated for each incoming file. By consequence incomingProcessTime.getCount() should be equals to the number of files received by the node.
+            assertThat(actual)
+                .describedAs("The amount of files received by node" + node + " from node" + peer + " is not the expected one. [expected: " + files + ", actual: " + actual + "]")
+                .isEqualTo(files);
+
+            if (metrics.incomingProcessTime.getCount() != 0)
+            {
+                assertThat(metrics.incomingProcessTime.getSnapshot().getMedian())
+                    .describedAs("The median processing time for data streamed from node"+ peer + " to node" + node + " should be non-0")
+                    .isGreaterThan(0);
+            }
+        });
+    }
+
+    private void checkTotalDataSent(Cluster cluster,
+                                    int node,
+                                    long outgoingBytes,
+                                    long outgoingRepairBytes,
+                                    long outgoingRepairSSTables)
+    {
+        cluster.get(node).runOnInstance(() -> {
+
+            long actual = StreamingMetrics.totalOutgoingBytes.getCount();
+            assertThat(actual)
+                .describedAs("The total amount of data sent by the node" + node + " is not the expected one. [expected: " + outgoingBytes + ", actual: " + actual + "]")
+                .isEqualTo(outgoingBytes);
+
+            actual = StreamingMetrics.totalOutgoingRepairBytes.getCount();
+            assertThat(actual)
+                .describedAs("The total amount of data sent by the node" + node + " for repair is not the expected one. [expected: " + outgoingRepairBytes + ", actual: " + actual + "]")
+                .isEqualTo(outgoingRepairBytes);
+
+            actual = StreamingMetrics.totalOutgoingRepairSSTables.getCount();
+            assertThat(actual)
+                .describedAs("The total amount of SSTables sent by the node" + node + " for repair is not the expected one. [expected: " + outgoingRepairSSTables + ", actual: " + actual + "]")
+                .isEqualTo(outgoingRepairSSTables);
+        });
+    }
+
+    private void checkTotalDataReceived(Cluster cluster, int node, long incomingBytes)
+    {
+        cluster.get(node).runOnInstance(() -> {
+
+            long actual = StreamingMetrics.totalIncomingBytes.getCount();
+            assertThat(actual)
+                .describedAs("The total amount of data received by the node" + node + " is not the expected one. [expected: " + incomingBytes + ", actual: " + actual + "]")
+                .isEqualTo(incomingBytes);
+       });
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org