You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2021/04/09 14:25:49 UTC
[cassandra] branch trunk updated: Fix Streaming Repair metrics
This is an automated email from the ASF dual-hosted git repository.
blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c4e4e93 Fix Streaming Repair metrics
c4e4e93 is described below
commit c4e4e9388b229a0db92c16029fd647909cec5737
Author: Benjamin Lerer <b....@gmail.com>
AuthorDate: Wed Mar 24 17:23:51 2021 +0100
Fix Streaming Repair metrics
patch by Benjamin Lerer; reviewed by Ekaterina Dimitrova for
CASSANDRA-16190
---
CHANGES.txt | 1 +
.../apache/cassandra/streaming/StreamSession.java | 17 +-
.../test/metrics/StreamingMetricsTest.java | 402 +++++++++++++++++----
3 files changed, 349 insertions(+), 71 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 99c8c97..590a4a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-rc1
+ * Fix Streaming Repair metrics (CASSANDRA-16190)
* Scheduled (delayed) schema pull tasks should not run after MIGRATION stage shutdown during decommission (CASSANDRA-16495)
* When behind a firewall trunk is not buildable, need to allow overriding URLs (CASSANDRA-16563)
* Make sure sstables with moved starts are removed correctly in LeveledGenerations (CASSANDRA-16552)
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index a3ffef1..3a32834 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -615,21 +615,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
state(State.PREPARING);
PrepareSynMessage prepare = new PrepareSynMessage();
prepare.requests.addAll(requests);
- long totalBytesToStream = 0;
- long totalSSTablesStreamed = 0;
for (StreamTransferTask task : transfers.values())
{
- totalBytesToStream += task.getTotalSize();
- totalSSTablesStreamed += task.getTotalNumberOfFiles();
prepare.summaries.add(task.getSummary());
}
- if(StreamOperation.REPAIR == getStreamOperation())
- {
- StreamingMetrics.totalOutgoingRepairBytes.inc(totalBytesToStream);
- StreamingMetrics.totalOutgoingRepairSSTables.inc(totalSSTablesStreamed);
- }
-
messageSender.sendMessage(prepare);
}
@@ -767,6 +757,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber
long headerSize = message.stream.getEstimatedSize();
StreamingMetrics.totalOutgoingBytes.inc(headerSize);
metrics.outgoingBytes.inc(headerSize);
+
+ if(StreamOperation.REPAIR == getStreamOperation())
+ {
+ StreamingMetrics.totalOutgoingRepairBytes.inc(headerSize);
+ StreamingMetrics.totalOutgoingRepairSSTables.inc(message.stream.getNumFiles());
+ }
+
// schedule timeout for receiving ACK
StreamTransferTask task = transfers.get(message.header.tableId);
if (task != null)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java b/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
index 22449e8..a4e4adc 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java
@@ -18,105 +18,385 @@
package org.apache.cassandra.distributed.test.metrics;
-import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.StreamingMetrics;
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
public class StreamingMetricsTest extends TestBaseImpl
{
- private static Cluster cluster;
- @BeforeClass
- public static void setupCluster() throws IOException
- {
- cluster = Cluster.build().withNodes(2)
- .withDataDirCount(1)
- .withConfig(config -> config.with(NETWORK)
- .set("stream_entire_sstables", false))
- .start();
- cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
- }
-
- private static InetAddressAndPort getNodeAddress(int num)
+ private static InetAddressAndPort getNodeAddress(Cluster cluster, int num)
{
InetSocketAddress broadcastAddress = cluster.get(num).broadcastAddress();
return InetAddressAndPort.getByAddressOverrideDefaults(broadcastAddress.getAddress(),
broadcastAddress.getPort());
}
- @AfterClass
- public static void teardownCluster()
+ @Test
+ public void testMetricsWithRepairAndStreamingFromTwoNodes() throws Exception
{
- if (cluster != null)
- cluster.close();
+ testMetricsWithStreamingFromTwoNodes(true);
}
+ @Test
+ public void testMetricsWithRebuildAndStreamingFromTwoNodes() throws Exception
+ {
+ testMetricsWithStreamingFromTwoNodes(false);
+ }
+
+ public void testMetricsWithStreamingFromTwoNodes(boolean useRepair) throws Exception
+ {
+ try(Cluster cluster = init(Cluster.build(3)
+ .withDataDirCount(1)
+ .withConfig(config -> config.with(NETWORK)
+ .set("stream_entire_sstables", false)
+ .set("hinted_handoff_enabled", false))
+ .start(), 2))
+ {
+ cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+ cluster.get(3).shutdown().get(10, TimeUnit.SECONDS);
+
+ final int rowsPerFile = 500;
+ final int files = 5;
+ for (int k = 0; k < files; k++)
+ {
+ for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+ {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"),
+ ConsistencyLevel.ONE,
+ Integer.toString(i));
+ }
+ cluster.get(1).nodetool("flush");
+ cluster.get(2).nodetool("flush");
+ }
+
+ cluster.get(3).startup();
+
+ // Checks that the table is empty on node 3
+ Object[][] results = cluster.get(3).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
+ assertThat(results.length).isEqualTo(0);
+
+ checkThatNoStreamingOccuredBetweenTheThreeNodes(cluster);
+
+ // Trigger streaming from node 3
+ if (useRepair)
+ cluster.get(3).nodetool("repair", "--full");
+ else
+ cluster.get(3).nodetool("rebuild", "--keyspace", KEYSPACE);
+
+
+ // Check streaming metrics on node 1
+ checkThatNoStreamingOccured(cluster, 1, 2);
+ long bytesFrom1 = checkDataSent(cluster, 1, 3);
+ checkDataReceived(cluster, 1, 3, 0, 0);
+
+ if (useRepair)
+ checkTotalDataSent(cluster, 1, bytesFrom1, bytesFrom1, files);
+ else
+ checkTotalDataSent(cluster, 1, bytesFrom1, 0, 0);
+
+ checkTotalDataReceived(cluster, 1, 0);
+
+ // Check streaming metrics on node 2
+ checkThatNoStreamingOccured(cluster, 2, 1);
+ long bytesFrom2 = checkDataSent(cluster, 2, 3);
+ checkDataReceived(cluster, 1, 2, 0, 0);
+
+ if (useRepair)
+ checkTotalDataSent(cluster, 2, bytesFrom2, bytesFrom2, files);
+ else
+ checkTotalDataSent(cluster, 2, bytesFrom2, 0, 0);
+
+ checkTotalDataReceived(cluster, 2, 0);
+
+ // Check streaming metrics on node 3
+ checkDataReceived(cluster, 3, 1, bytesFrom1, files);
+ checkDataReceived(cluster, 3, 2, bytesFrom2, files);
+ checkTotalDataSent(cluster, 3, 0, 0, 0);
+ checkTotalDataReceived(cluster, 3, bytesFrom1 + bytesFrom2);
+ }
+ }
@Test
- public void testStreamMetrics()
+ public void testMetricsWithRebuildAndStreamingToTwoNodes() throws Exception
{
- cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+ testMetricsWithStreamingToTwoNodes(false);
+ }
+
+ @Test
+ public void testMetricsWithRepairAndStreamingToTwoNodes() throws Exception
+ {
+ testMetricsWithStreamingToTwoNodes(true);
+ }
- final int rowsPerFile = 500;
- final int files = 5;
- for (int k = 0; k < files; k++)
+ public void testMetricsWithStreamingToTwoNodes(boolean useRepair) throws Exception
+ {
+ try(Cluster cluster = init(Cluster.build(3)
+ .withDataDirCount(1)
+ .withConfig(config -> config.with(NETWORK)
+ .set("stream_entire_sstables", false)
+ .set("hinted_handoff_enabled", false))
+ .start(), 2))
{
- for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
- cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"), Integer.toString(i));
- cluster.get(1).nodetool("flush");
+ cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy"));
+
+ final int rowsPerFile = 500;
+ final int files = 5;
+
+ cluster.get(3).shutdown().get(10, TimeUnit.SECONDS);
+ cluster.get(1).nodetool("disableautocompaction", KEYSPACE);
+ cluster.get(2).nodetool("disableautocompaction", KEYSPACE);
+
+ int sstablesInitiallyOnNode2 = 0;
+ int sstablesInitiallyOnNode3 = 0;
+
+ for (int k = 0; k < 3; k++)
+ {
+ for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+ {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"),
+ ConsistencyLevel.ONE,
+ Integer.toString(i));
+ }
+ cluster.get(1).nodetool("flush");
+ cluster.get(2).nodetool("flush");
+ sstablesInitiallyOnNode2++;
+ }
+
+ cluster.get(3).startup();
+ cluster.get(3).nodetool("disableautocompaction", KEYSPACE);
+
+ cluster.get(2).shutdown().get(10, TimeUnit.SECONDS);
+
+ for (int k = 3; k < files; k++)
+ {
+ for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i)
+ {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"),
+ ConsistencyLevel.ONE,
+ Integer.toString(i));
+ }
+ cluster.get(1).nodetool("flush");
+ cluster.get(3).nodetool("flush");
+ sstablesInitiallyOnNode3++;
+ }
+
+ cluster.get(2).startup();
+ cluster.get(2).nodetool("disableautocompaction", KEYSPACE);
+
+ checkThatNoStreamingOccuredBetweenTheThreeNodes(cluster);
+
+ // Trigger streaming from node 3 and node 2
+
+ long bytesFrom2To1;
+ int sstablesFrom2To1;
+
+ long bytesFrom3To1;
+ int sstablesFrom3To1;
+
+ int sstablesFrom3To2;
+ int sstablesFrom2To3;
+
+ if (useRepair)
+ {
+ cluster.get(3).nodetool("repair", "--full");
+ cluster.get(2).nodetool("repair", "--full");
+
+ bytesFrom2To1 = checkDataSent(cluster, 2, 1);
+ sstablesFrom2To1 = sstablesInitiallyOnNode2;
+
+ bytesFrom3To1 = checkDataSent(cluster, 3, 1) ;
+ sstablesFrom3To1 = sstablesInitiallyOnNode3;
+
+ sstablesFrom2To3 = sstablesInitiallyOnNode2;
+ sstablesFrom3To2 = sstablesInitiallyOnNode3;
+ }
+ else
+ {
+ cluster.get(3).nodetool("rebuild", "--keyspace", KEYSPACE);
+ cluster.get(2).nodetool("rebuild", "--keyspace", KEYSPACE);
+
+ bytesFrom2To1 = 0;
+ sstablesFrom2To1 = 0;
+
+ bytesFrom3To1 = 0;
+ sstablesFrom3To1 = 0;
+
+ sstablesFrom2To3 = sstablesInitiallyOnNode2;
+ sstablesFrom3To2 = sstablesInitiallyOnNode3 + sstablesInitiallyOnNode2;
+ }
+
+ // Check streaming metrics on node 1
+ long bytesFrom1To2 = checkDataSent(cluster, 1, 2);
+ long bytesFrom1To3 = checkDataSent(cluster, 1, 3);
+
+ long totalBytesSentFrom1 = bytesFrom1To2 + bytesFrom1To3;
+
+ if (useRepair)
+ checkTotalDataSent(cluster, 1, totalBytesSentFrom1, totalBytesSentFrom1, 10);
+ else
+ checkTotalDataSent(cluster, 1, totalBytesSentFrom1, 0, 0);
+
+ checkDataReceived(cluster, 1, 2, bytesFrom2To1, sstablesFrom2To1);
+ checkDataReceived(cluster, 1, 3, bytesFrom3To1, sstablesFrom3To1);
+ checkTotalDataReceived(cluster, 1, bytesFrom2To1 + bytesFrom3To1);
+
+ // Check streaming metrics on node 2 and 3
+ long bytesFrom2To3 = checkDataSent(cluster, 2, 3);
+ long bytesFrom3To2 = checkDataSent(cluster, 3, 2);
+
+ long totalBytesReceivedBy2 = bytesFrom1To2 + bytesFrom3To2;
+
+ checkDataReceived(cluster, 2, 1, bytesFrom1To2, files);
+ checkDataReceived(cluster, 2, 3, bytesFrom3To2, sstablesFrom3To2);
+
+ if (useRepair)
+ checkTotalDataSent(cluster, 2, bytesFrom2To3 + bytesFrom2To1, bytesFrom2To3 + bytesFrom2To1, sstablesFrom2To3 + sstablesFrom2To1);
+ else
+ checkTotalDataSent(cluster, 2, bytesFrom2To3, 0, 0);
+
+ checkTotalDataReceived(cluster, 2, totalBytesReceivedBy2);
+
+ long totalBytesReceivedBy3 = bytesFrom1To3 + bytesFrom2To3;
+
+ checkDataReceived(cluster, 3, 1, bytesFrom1To3, files);
+ checkDataReceived(cluster, 3, 2, bytesFrom2To3, sstablesFrom2To3);
+
+ if (useRepair)
+ checkTotalDataSent(cluster, 3, bytesFrom3To2 + bytesFrom3To1, bytesFrom3To2 + bytesFrom3To1, sstablesFrom3To2 + sstablesFrom3To1);
+ else
+ checkTotalDataSent(cluster, 3, bytesFrom3To2, 0, 0);
+
+ checkTotalDataReceived(cluster, 3, totalBytesReceivedBy3);
}
+ }
- cluster.get(2).executeInternal("TRUNCATE system.available_ranges;");
- Object[][] results = cluster.get(2).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
- assertThat(results.length).isEqualTo(0);
+ private void checkThatNoStreamingOccuredBetweenTheThreeNodes(Cluster cluster)
+ {
+ checkThatNoStreamingOccured(cluster, 1, 2);
+ checkThatNoStreamingOccured(cluster, 1, 3);
+ checkTotalDataSent(cluster, 1, 0, 0, 0);
+ checkTotalDataReceived(cluster, 1, 0);
- InetAddressAndPort node1Address = getNodeAddress(1);
- InetAddressAndPort node2Address = getNodeAddress(2);
+ checkThatNoStreamingOccured(cluster, 2, 1);
+ checkThatNoStreamingOccured(cluster, 2, 3);
+ checkTotalDataSent(cluster, 2, 0, 0, 0);
+ checkTotalDataReceived(cluster, 2, 0);
- // Trigger streaming from node 2
- cluster.get(2).nodetool("rebuild", "--keyspace", KEYSPACE);
+ checkThatNoStreamingOccured(cluster, 3, 1);
+ checkThatNoStreamingOccured(cluster, 3, 2);
+ checkTotalDataSent(cluster, 3, 0, 0, 0);
+ checkTotalDataReceived(cluster, 3, 0);
+ }
+
+ private void checkThatNoStreamingOccured(Cluster cluster, int node, int peer)
+ {
+ InetAddressAndPort address = getNodeAddress(cluster, peer);
+ cluster.get(node).runOnInstance(() -> {
+
+ StreamingMetrics metrics = StreamingMetrics.get(address);
- // Assert metrics in node 2
- long transmittedBytes = cluster.get(2).callOnInstance(() -> {
- StreamingMetrics metrics = StreamingMetrics.get(node1Address);
assertThat(metrics.incomingBytes.getCount())
- .isGreaterThan(0)
- .describedAs("There should be bytes streamed from the peer.");
+ .describedAs("No SSTable should have been streamed so far from node" + node + " to node" + peer)
+ .isEqualTo(0);
+
assertThat(metrics.outgoingBytes.getCount())
- .isEqualTo(0)
- .describedAs("There should not be sstables streamed to the peer.");
+ .describedAs("No SSTable should have been streamed so far from node" + node + " to node" + peer)
+ .isEqualTo(0);
+
assertThat(metrics.incomingProcessTime.getCount())
- .isEqualTo(files)
- .describedAs("There should be " + files + " files streamed from the peer.");
- assertThat(metrics.incomingProcessTime.getSnapshot().getMedian())
- .isGreaterThan(0)
- .describedAs("The median processing time should be non-0");
- return metrics.incomingBytes.getCount();
+ .describedAs("No SSTable should have been streamed so far from node" + node + " to node" + peer)
+ .isEqualTo(0);
});
+ }
- // Assert metrics in node 1
- cluster.get(1).runOnInstance(() -> {
- StreamingMetrics metrics = StreamingMetrics.get(node2Address);
- assertThat(metrics.incomingBytes.getCount())
- .isEqualTo(0).describedAs("There should not be sstables streamed from the peer.");
- assertThat(metrics.outgoingBytes.getCount())
- .isEqualTo(transmittedBytes)
- .describedAs("The outgoingBytes count in node1 should be equals to incomingBytes count in node2");
- assertThat(metrics.incomingProcessTime.getCount())
- .isEqualTo(0)
- .describedAs("There should be no files streamed from the peer.");
+ private long checkDataSent(Cluster cluster, int node, int peer)
+ {
+ InetAddressAndPort address = getNodeAddress(cluster, peer);
+ return cluster.get(node).callOnInstance(() -> {
+
+ StreamingMetrics metrics = StreamingMetrics.get(address);
+
+ long outgoingBytes = metrics.outgoingBytes.getCount();
+ assertThat(outgoingBytes)
+ .describedAs("There should be data streamed from node" + node + " to node" + peer)
+ .isGreaterThan(0);
+
+ return outgoingBytes;
});
}
+
+ private void checkDataReceived(Cluster cluster, int node, int peer, long receivedBytes, int files)
+ {
+ InetAddressAndPort address = getNodeAddress(cluster, peer);
+ cluster.get(node).runOnInstance(() -> {
+
+ StreamingMetrics metrics = StreamingMetrics.get(address);
+
+ long actual = metrics.incomingBytes.getCount();
+ assertThat(actual)
+ .describedAs("The amount of data received by node" + node + " from node" + peer + " is not the expected one. [expected: " + receivedBytes + ", actual: " + actual + "]")
+ .isEqualTo(receivedBytes);
+
+ actual = metrics.incomingProcessTime.getCount();
+ // The incomingProcessTime timer is updated for each incoming file. By consequence incomingProcessTime.getCount() should be equals to the number of files received by the node.
+ assertThat(actual)
+ .describedAs("The amount of files received by node" + node + " from node" + peer + " is not the expected one. [expected: " + files + ", actual: " + actual + "]")
+ .isEqualTo(files);
+
+ if (metrics.incomingProcessTime.getCount() != 0)
+ {
+ assertThat(metrics.incomingProcessTime.getSnapshot().getMedian())
+ .describedAs("The median processing time for data streamed from node"+ peer + " to node" + node + " should be non-0")
+ .isGreaterThan(0);
+ }
+ });
+ }
+
+ private void checkTotalDataSent(Cluster cluster,
+ int node,
+ long outgoingBytes,
+ long outgoingRepairBytes,
+ long outgoingRepairSSTables)
+ {
+ cluster.get(node).runOnInstance(() -> {
+
+ long actual = StreamingMetrics.totalOutgoingBytes.getCount();
+ assertThat(actual)
+ .describedAs("The total amount of data sent by the node" + node + " is not the expected one. [expected: " + outgoingBytes + ", actual: " + actual + "]")
+ .isEqualTo(outgoingBytes);
+
+ actual = StreamingMetrics.totalOutgoingRepairBytes.getCount();
+ assertThat(actual)
+ .describedAs("The total amount of data sent by the node" + node + " for repair is not the expected one. [expected: " + outgoingRepairBytes + ", actual: " + actual + "]")
+ .isEqualTo(outgoingRepairBytes);
+
+ actual = StreamingMetrics.totalOutgoingRepairSSTables.getCount();
+ assertThat(actual)
+ .describedAs("The total amount of SSTables sent by the node" + node + " for repair is not the expected one. [expected: " + outgoingRepairSSTables + ", actual: " + actual + "]")
+ .isEqualTo(outgoingRepairSSTables);
+ });
+ }
+
+ private void checkTotalDataReceived(Cluster cluster, int node, long incomingBytes)
+ {
+ cluster.get(node).runOnInstance(() -> {
+
+ long actual = StreamingMetrics.totalIncomingBytes.getCount();
+ assertThat(actual)
+ .describedAs("The total amount of data received by the node" + node + " is not the expected one. [expected: " + incomingBytes + ", actual: " + actual + "]")
+ .isEqualTo(incomingBytes);
+ });
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org