You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/09/18 19:41:13 UTC
[cassandra] branch trunk updated: Show the progress of data
streaming and index build
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b936d3e Show the progress of data streaming and index build
b936d3e is described below
commit b936d3e95db3180a2f1f32ed7c80543e81971fde
Author: Stefan Miklosovic <st...@instaclustr.com>
AuthorDate: Fri Sep 18 11:40:19 2020 -0700
Show the progress of data streaming and index build
patch by Stefan Miklosovic; reviewed by Benjamin Lerer, Berenguer Blasi, David Capwell for CASSANDRA-15406
---
CHANGES.txt | 1 +
.../streaming/CassandraCompressedStreamReader.java | 2 +-
.../db/streaming/CassandraStreamReader.java | 2 +-
.../apache/cassandra/streaming/ProgressInfo.java | 2 +-
.../cassandra/streaming/StreamReceiveTask.java | 4 +-
.../apache/cassandra/tools/nodetool/NetStats.java | 64 ++-
.../cassandra/distributed/impl/Instance.java | 14 +-
.../shared/NodeToolResultWithOutput.java | 48 ++
.../test/AbstractNetstatsBootstrapStreaming.java | 85 ++++
.../test/AbstractNetstatsStreaming.java | 548 +++++++++++++++++++++
...WithEntireSSTablesCompressionStreamingTest.java | 36 ++
...houtEntireSSTablesCompressionStreamingTest.java | 36 ++
.../test/NetstatsRepairStreamingTest.java | 88 ++++
.../cassandra/distributed/util/NodetoolUtils.java | 68 +++
14 files changed, 978 insertions(+), 20 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 4916faa..42b7aa7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
* Make Table/Keyspace Metric Names Consistent With Each Other (CASSANDRA-15909)
* Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure (CASSANDRA-15861)
* NPE thrown while updating speculative execution time if keyspace is removed during task execution (CASSANDRA-15949)
+ * Show the progress of data streaming and index build (CASSANDRA-15406)
Merged from 3.11:
* Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
* Make sure LCS handles duplicate sstable added/removed notifications correctly (CASSANDRA-14103)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
index 2491fe1..ff9e6f7 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -97,7 +97,7 @@ public class CassandraCompressedStreamReader extends CassandraStreamReader
{
writePartition(deserializer, writer);
// when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
- session.progress(filename, ProgressInfo.Direction.IN, cis.chunkBytesRead(), totalSize);
+ session.progress(filename + '-' + fileSeqNum, ProgressInfo.Direction.IN, cis.chunkBytesRead(), totalSize);
}
assert in.getBytesRead() == sectionLength;
}
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 686d874..6835fad 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -124,7 +124,7 @@ public class CassandraStreamReader implements IStreamReader
{
writePartition(deserializer, writer);
// TODO move this to BytesReadTracker
- session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
+ session.progress(writer.getFilename() + '-' + fileSeqNum, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
session.planId(), fileSeqNum, session.peer, FBUtilities.prettyPrintMemory(in.getBytesRead()), FBUtilities.prettyPrintMemory(totalSize));
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index ac91855..2b306f8 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -110,7 +110,7 @@ public class ProgressInfo implements Serializable
{
StringBuilder sb = new StringBuilder(fileName);
sb.append(" ").append(currentBytes);
- sb.append("/").append(totalBytes).append(" bytes");
+ sb.append("/").append(totalBytes).append(" bytes ");
sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
sb.append(direction == Direction.OUT ? "sent to " : "received from ");
sb.append("idx:").append(sessionIndex);
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 25977a5..d127edb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -85,8 +85,8 @@ public class StreamReceiveTask extends StreamTask
remoteStreamsReceived += stream.getNumFiles();
bytesReceived += stream.getSize();
Preconditions.checkArgument(tableId.equals(stream.getTableId()));
- logger.debug("received {} of {} total files {} of total bytes {}", remoteStreamsReceived, totalStreams,
- bytesReceived, totalSize);
+ logger.debug("received {} of {} total files, {} of total bytes {}", remoteStreamsReceived, totalStreams,
+ bytesReceived, stream.getSize());
receiver.received(stream);
diff --git a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
index c0500ca..e86505b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
@@ -59,10 +59,29 @@ public class NetStats extends NodeToolCmd
System.out.printf("%n");
if (!info.receivingSummaries.isEmpty())
{
+ long totalFilesToReceive = info.getTotalFilesToReceive();
+ long totalBytesToReceive = info.getTotalSizeToReceive();
+ long totalFilesReceived = info.getTotalFilesReceived();
+ long totalSizeReceived = info.getTotalSizeReceived();
+ double percentageFilesReceived = ((double) totalFilesReceived / totalFilesToReceive) * 100;
+ double percentageSizesReceived = ((double) totalSizeReceived / totalBytesToReceive) * 100;
+
if (humanReadable)
- System.out.printf(" Receiving %d files, %s total. Already received %d files, %s total%n", info.getTotalFilesToReceive(), FileUtils.stringifyFileSize(info.getTotalSizeToReceive()), info.getTotalFilesReceived(), FileUtils.stringifyFileSize(info.getTotalSizeReceived()));
+ System.out.printf(" Receiving %d files, %s total. Already received %d files (%.2f%%), %s total (%.2f%%)%n",
+ totalFilesToReceive,
+ FileUtils.stringifyFileSize(totalBytesToReceive),
+ totalFilesReceived,
+ percentageFilesReceived,
+ FileUtils.stringifyFileSize(totalSizeReceived),
+ percentageSizesReceived);
else
- System.out.printf(" Receiving %d files, %d bytes total. Already received %d files, %d bytes total%n", info.getTotalFilesToReceive(), info.getTotalSizeToReceive(), info.getTotalFilesReceived(), info.getTotalSizeReceived());
+ System.out.printf(" Receiving %d files, %d bytes total. Already received %d files (%.2f%%), %d bytes total (%.2f%%)%n",
+ totalFilesToReceive,
+ totalBytesToReceive,
+ totalFilesReceived,
+ percentageFilesReceived,
+ totalSizeReceived,
+ percentageSizesReceived);
for (ProgressInfo progress : info.getReceivingFiles())
{
System.out.printf(" %s%n", progress.toString(printPort));
@@ -70,10 +89,29 @@ public class NetStats extends NodeToolCmd
}
if (!info.sendingSummaries.isEmpty())
{
+ long totalFilesToSend = info.getTotalFilesToSend();
+ long totalSizeToSend = info.getTotalSizeToSend();
+ long totalFilesSent = info.getTotalFilesSent();
+ long totalSizeSent = info.getTotalSizeSent();
+ double percentageFilesSent = ((double) totalFilesSent / totalFilesToSend) * 100;
+ double percentageSizeSent = ((double) totalSizeSent / totalSizeToSend) * 100;
+
if (humanReadable)
- System.out.printf(" Sending %d files, %s total. Already sent %d files, %s total%n", info.getTotalFilesToSend(), FileUtils.stringifyFileSize(info.getTotalSizeToSend()), info.getTotalFilesSent(), FileUtils.stringifyFileSize(info.getTotalSizeSent()));
+ System.out.printf(" Sending %d files, %s total. Already sent %d files (%.2f%%), %s total (%.2f%%)%n",
+ totalFilesToSend,
+ FileUtils.stringifyFileSize(totalSizeToSend),
+ totalFilesSent,
+ percentageFilesSent,
+ FileUtils.stringifyFileSize(totalSizeSent),
+ percentageSizeSent);
else
- System.out.printf(" Sending %d files, %d bytes total. Already sent %d files, %d bytes total%n", info.getTotalFilesToSend(), info.getTotalSizeToSend(), info.getTotalFilesSent(), info.getTotalSizeSent());
+ System.out.printf(" Sending %d files, %d bytes total. Already sent %d files (%.2f%%), %d bytes total (%.2f%%) %n",
+ totalFilesToSend,
+ totalSizeToSend,
+ totalFilesSent,
+ percentageFilesSent,
+ totalSizeSent,
+ percentageSizeSent);
for (ProgressInfo progress : info.getSendingFiles())
{
System.out.printf(" %s%n", progress.toString(printPort));
@@ -98,35 +136,35 @@ public class NetStats extends NodeToolCmd
long dropped;
pending = 0;
- for (int n : ms.getLargeMessagePendingTasks().values())
+ for (int n : ms.getLargeMessagePendingTasksWithPort().values())
pending += n;
completed = 0;
- for (long n : ms.getLargeMessageCompletedTasks().values())
+ for (long n : ms.getLargeMessageCompletedTasksWithPort().values())
completed += n;
dropped = 0;
- for (long n : ms.getLargeMessageDroppedTasks().values())
+ for (long n : ms.getLargeMessageDroppedTasksWithPort().values())
dropped += n;
System.out.printf("%-25s%10s%10s%15s%10s%n", "Large messages", "n/a", pending, completed, dropped);
pending = 0;
- for (int n : ms.getSmallMessagePendingTasks().values())
+ for (int n : ms.getSmallMessagePendingTasksWithPort().values())
pending += n;
completed = 0;
- for (long n : ms.getSmallMessageCompletedTasks().values())
+ for (long n : ms.getSmallMessageCompletedTasksWithPort().values())
completed += n;
dropped = 0;
- for (long n : ms.getSmallMessageDroppedTasks().values())
+ for (long n : ms.getSmallMessageDroppedTasksWithPort().values())
dropped += n;
System.out.printf("%-25s%10s%10s%15s%10s%n", "Small messages", "n/a", pending, completed, dropped);
pending = 0;
- for (int n : ms.getGossipMessagePendingTasks().values())
+ for (int n : ms.getGossipMessagePendingTasksWithPort().values())
pending += n;
completed = 0;
- for (long n : ms.getGossipMessageCompletedTasks().values())
+ for (long n : ms.getGossipMessageCompletedTasksWithPort().values())
completed += n;
dropped = 0;
- for (long n : ms.getGossipMessageDroppedTasks().values())
+ for (long n : ms.getGossipMessageDroppedTasksWithPort().values())
dropped += n;
System.out.printf("%-25s%10s%10s%15s%10s%n", "Gossip messages", "n/a", pending, completed, dropped);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 5395cb8..038698b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -605,18 +605,28 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}).call();
}
- private static class DTestNodeTool extends NodeTool {
+ public static class DTestNodeTool extends NodeTool {
private final StorageServiceMBean storageProxy;
private final CollectingNotificationListener notifications = new CollectingNotificationListener();
private Throwable latestError;
- DTestNodeTool(boolean withNotifications) {
+ public DTestNodeTool(boolean withNotifications) {
super(new InternalNodeProbeFactory(withNotifications));
storageProxy = new InternalNodeProbe(withNotifications).getStorageService();
storageProxy.addNotificationListener(notifications, null, null);
}
+ public List<Notification> getNotifications()
+ {
+ return new ArrayList<>(notifications.notifications);
+ }
+
+ public Throwable getLatestError()
+ {
+ return latestError;
+ }
+
public int execute(String... args)
{
try
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/NodeToolResultWithOutput.java b/test/distributed/org/apache/cassandra/distributed/shared/NodeToolResultWithOutput.java
new file mode 100644
index 0000000..cb94887
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/shared/NodeToolResultWithOutput.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.cassandra.distributed.api.NodeToolResult;
+
+public class NodeToolResultWithOutput
+{
+ private final NodeToolResult result;
+ private final ByteArrayOutputStream stdout;
+ private final ByteArrayOutputStream stderr;
+
+ public NodeToolResultWithOutput(NodeToolResult result, ByteArrayOutputStream stdout, ByteArrayOutputStream stderr) {
+ this.result = result;
+ this.stdout = stdout;
+ this.stderr = stderr;
+ }
+
+ public NodeToolResult getResult() {
+ return this.result;
+ }
+
+ public String getStdout() {
+ return this.stdout.toString();
+ }
+
+ public String getStderr() {
+ return this.stderr.toString();
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
new file mode 100644
index 0000000..7aca7bd
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public abstract class AbstractNetstatsBootstrapStreaming extends AbstractNetstatsStreaming
+{
+ protected void executeTest(final boolean streamEntireSSTables,
+ final boolean compressionEnabled) throws Exception
+ {
+ final Cluster.Builder builder = builder().withNodes(1)
+ .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(2))
+ .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0"))
+ .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL)
+ .set("stream_throughput_outbound_megabits_per_sec", 1)
+ .set("compaction_throughput_mb_per_sec", 1)
+ .set("stream_entire_sstables", streamEntireSSTables));
+
+ try (final Cluster cluster = builder.withNodes(1).start())
+ {
+ // populate data only against 1 node first
+
+ createTable(cluster, 1, compressionEnabled);
+
+ cluster.get(1).nodetoolResult("disableautocompaction", "netstats_test").asserts().success();
+
+ if (compressionEnabled)
+ {
+ populateData(true);
+ }
+ else
+ {
+ populateData(false);
+ }
+
+ cluster.get(1).flush("netstats_test");
+
+ // then bootstrap the second one, upon joining,
+ // we should see that netstats shows how SSTables are being streamed on the first node
+
+ final IInstanceConfig config = cluster.newInstanceConfig();
+ config.set("auto_bootstrap", true);
+
+ IInvokableInstance secondNode = cluster.bootstrap(config);
+
+ final Future<?> startupRunnable = executorService.submit((Runnable) secondNode::startup);
+ final Future<AbstractNetstatsStreaming.NetstatResults> netstatsFuture = executorService.submit(new NetstatsCallable(cluster.get(1)));
+
+ final AbstractNetstatsStreaming.NetstatResults results = netstatsFuture.get(1, MINUTES);
+ startupRunnable.get(2, MINUTES);
+
+ results.assertSuccessful();
+
+ AbstractNetstatsStreaming.NetstatsOutputParser.validate(AbstractNetstatsStreaming.NetstatsOutputParser.parse(results));
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsStreaming.java b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsStreaming.java
new file mode 100644
index 0000000..85e3e23
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsStreaming.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.NodeToolResultWithOutput;
+import org.apache.cassandra.distributed.util.NodetoolUtils;
+import org.apache.cassandra.utils.Pair;
+
+import static java.util.stream.Collectors.toList;
+
+public abstract class AbstractNetstatsStreaming extends TestBaseImpl
+{
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractNetstatsStreaming.class);
+
+ protected ExecutorService executorService;
+
+ @Before
+ public void setup()
+ {
+ executorService = Executors.newCachedThreadPool();
+ }
+
+ @After
+ public void teardown() throws Exception
+ {
+ try
+ {
+ executorService.shutdownNow();
+
+ if (!executorService.isShutdown())
+ {
+ if (!executorService.awaitTermination(1, TimeUnit.MINUTES))
+ {
+ throw new IllegalStateException("Unable to shutdown executor for invoking netstat commands.");
+ }
+ }
+ }
+ finally
+ {
+ executorService = null;
+ }
+ }
+
+ protected void changeReplicationFactor()
+ {
+ try (com.datastax.driver.core.Cluster c = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+ Session s = c.connect())
+ {
+ s.execute("ALTER KEYSPACE netstats_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
+ }
+ }
+
+ protected void createTable(Cluster cluster, int replicationFactor, boolean compressionEnabled)
+ {
+ // replication factor is 1
+ cluster.schemaChange("CREATE KEYSPACE netstats_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
+
+ if (compressionEnabled)
+ {
+ cluster.schemaChange("CREATE TABLE netstats_test.test_table (id uuid primary key) WITH compression = {'enabled':'true', 'class': 'LZ4Compressor'};");
+ }
+ else
+ {
+ cluster.schemaChange("CREATE TABLE netstats_test.test_table (id uuid primary key) WITH compression = {'enabled':'false'};");
+ }
+ }
+
+ protected void populateData(boolean forCompressedTest)
+ {
+ try (com.datastax.driver.core.Cluster c = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+ Session s = c.connect("netstats_test"))
+ {
+ int records = forCompressedTest ? 100_000 : 70_000;
+
+ for (int i = 0; i < records; i++)
+ {
+ s.execute("INSERT INTO test_table (id) VALUES (" + UUID.randomUUID() + ")");
+ }
+ }
+ }
+
+ protected static class NetstatsOutputParser
+ {
+ public static List<Pair<ReceivingStastistics, SendingStatistics>> parse(final NetstatResults results)
+ {
+ final Set<String> outputs = new LinkedHashSet<>();
+
+ results.netstatOutputs.stream()
+ .map(NodeToolResultWithOutput::getStdout)
+ .filter(output -> !output.contains("Not sending any streams"))
+ .filter(output -> output.contains("Receiving") || output.contains("Sending"))
+ .forEach(outputs::add);
+
+ final List<Pair<ReceivingStastistics, SendingStatistics>> parsed = new ArrayList<>();
+
+ for (final String output : outputs)
+ {
+ boolean processingReceiving = false;
+ boolean processingSending = false;
+
+ final ReceivingStastistics receivingStastistics = new ReceivingStastistics();
+ final SendingStatistics sendingStatistics = new SendingStatistics();
+
+ final List<String> sanitisedOutput = Stream.of(output.split("\n"))
+ .map(String::trim)
+ .filter(line -> !line.isEmpty())
+ // sometimes logs are mangled into output
+ .filter(line -> Stream.of("DEBUG", "INFO", "ERROR", "WARN").noneMatch(line::contains))
+ .filter(line -> Stream.of("Mode:", "Read", "Attempted", "Mismatch", "Pool", "Large", "Small", "Gossip").noneMatch(line::startsWith))
+ .collect(toList());
+
+ for (final String outputLine : sanitisedOutput)
+ {
+ if (outputLine.startsWith("Receiving"))
+ {
+ processingReceiving = true;
+ processingSending = false;
+
+ receivingStastistics.parseHeader(outputLine);
+ }
+ else if (outputLine.startsWith("Sending"))
+ {
+ processingSending = true;
+ processingReceiving = false;
+
+ sendingStatistics.parseHeader(outputLine);
+ }
+ else if (processingReceiving)
+ {
+ receivingStastistics.parseTable(outputLine);
+ }
+ else if (processingSending)
+ {
+ sendingStatistics.parseTable(outputLine);
+ }
+ }
+
+ parsed.add(Pair.create(receivingStastistics, sendingStatistics));
+ }
+
+ return parsed;
+ }
+
+ public static void validate(List<Pair<ReceivingStastistics, SendingStatistics>> result)
+ {
+ List<SendingStatistics> sendingStatistics = result.stream().map(pair -> pair.right).collect(toList());
+
+ if (sendingStatistics.size() >= 2)
+ {
+ for (int i = 0; i < sendingStatistics.size() - 1; i++)
+ {
+ SendingStatistics.SendingHeader header1 = sendingStatistics.get(i).sendingHeader;
+ SendingStatistics.SendingHeader header2 = sendingStatistics.get(i + 1).sendingHeader;
+
+ if (header1 != null && header2 != null)
+ {
+ Assert.assertTrue(header1.compareTo(header2) <= 0);
+ }
+ }
+ }
+
+ for (SendingStatistics sending : sendingStatistics)
+ {
+ if (sending.sendingHeader != null)
+ {
+ Assert.assertEquals(sending.sendingHeader.bytesTotalSoFar, (long) sending.sendingSSTable.stream().map(table -> table.bytesSent).reduce(Long::sum).orElseGet(() -> 0L));
+ Assert.assertTrue(sending.sendingHeader.bytesTotal >= sending.sendingSSTable.stream().map(table -> table.bytesInTotal).reduce(Long::sum).orElseGet(() -> 0L));
+
+ if (sending.sendingHeader.bytesTotalSoFar != 0)
+ {
+ double progress = (double) sending.sendingSSTable.stream().map(table -> table.bytesSent).reduce(Long::sum).orElse(0L) / (double) sending.sendingHeader.bytesTotal;
+
+ Assert.assertTrue((int) sending.sendingHeader.progressBytes >= (int) (progress * 100));
+
+ Assert.assertTrue((double) sending.sendingHeader.bytesTotal >= (double) sending.sendingSSTable.stream().map(table -> table.bytesInTotal).reduce(Long::sum).orElse(0L));
+ }
+ }
+ }
+
+ List<ReceivingStastistics> receivingStastistics = result.stream().map(pair -> pair.left).collect(toList());
+
+ for (ReceivingStastistics receiving : receivingStastistics)
+ {
+ if (receiving.receivingHeader != null)
+ {
+ Assert.assertTrue(receiving.receivingHeader.bytesTotal >= receiving.receivingTables.stream().map(table -> table.receivedSoFar).reduce(Long::sum).orElse(0L));
+ Assert.assertEquals(receiving.receivingHeader.bytesTotalSoFar, (long) receiving.receivingTables.stream().map(table -> table.receivedSoFar).reduce(Long::sum).orElse(0L));
+ }
+ }
+ }
+
+ public static class ReceivingStastistics
+ {
+ public ReceivingHeader receivingHeader;
+ public List<ReceivingTable> receivingTables = new ArrayList<>();
+
+ public void parseHeader(String header)
+ {
+ receivingHeader = ReceivingHeader.parseHeader(header);
+ }
+
+ public void parseTable(String table)
+ {
+ receivingTables.add(ReceivingTable.parseTable(table));
+ }
+
+ public String toString()
+ {
+ return "ReceivingStastistics{" +
+ "receivingHeader=" + receivingHeader +
+ ", receivingTables=" + receivingTables +
+ '}';
+ }
+
+ public static class ReceivingHeader
+ {
+ private static final Pattern receivingHeaderPattern = Pattern.compile(
+ "Receiving (.*) files, (.*) bytes total. Already received (.*) files \\((.*)%\\), (.*) bytes total \\((.*)%\\)"
+ );
+
+ int totalReceiving = 0;
+ long bytesTotal = 0;
+ int alreadyReceived = 0;
+ double progressFiles = 0.0f;
+ long bytesTotalSoFar = 0;
+ double progressBytes = 0.0f;
+
+ public static ReceivingHeader parseHeader(String header)
+ {
+ final Matcher matcher = receivingHeaderPattern.matcher(header);
+
+ if (matcher.matches())
+ {
+ final ReceivingHeader receivingHeader = new ReceivingHeader();
+
+ receivingHeader.totalReceiving = Integer.parseInt(matcher.group(1));
+ receivingHeader.bytesTotal = Long.parseLong(matcher.group(2));
+ receivingHeader.alreadyReceived = Integer.parseInt(matcher.group(3));
+ receivingHeader.progressFiles = Double.parseDouble(matcher.group(4));
+ receivingHeader.bytesTotalSoFar = Long.parseLong(matcher.group(5));
+ receivingHeader.progressBytes = Double.parseDouble(matcher.group(6));
+
+ return receivingHeader;
+ }
+
+ throw new IllegalStateException("Header does not match - " + header);
+ }
+
+ public String toString()
+ {
+ return "ReceivingHeader{" +
+ "totalReceiving=" + totalReceiving +
+ ", bytesTotal=" + bytesTotal +
+ ", alreadyReceived=" + alreadyReceived +
+ ", progressFiles=" + progressFiles +
+ ", bytesTotalSoFar=" + bytesTotalSoFar +
+ ", progressBytes=" + progressBytes +
+ '}';
+ }
+ }
+
+ public static class ReceivingTable
+ {
+ long receivedSoFar = 0;
+ long toReceive = 0;
+ double progress = 0.0;
+
+ private static final Pattern recievingFilePattern = Pattern.compile("(.*) (.*)/(.*) bytes \\((.*)%\\) received from (.*)");
+
+ public static ReceivingTable parseTable(String table)
+ {
+ final Matcher matcher = recievingFilePattern.matcher(table);
+
+ if (matcher.matches())
+ {
+ final ReceivingTable receivingTable = new ReceivingTable();
+
+ receivingTable.receivedSoFar = Long.parseLong(matcher.group(2));
+ receivingTable.toReceive = Long.parseLong(matcher.group(3));
+ receivingTable.progress = Double.parseDouble(matcher.group(4));
+
+ return receivingTable;
+ }
+
+ throw new IllegalStateException("Table line does not match - " + table);
+ }
+
+ public String toString()
+ {
+ return "ReceivingTable{" +
+ "receivedSoFar=" + receivedSoFar +
+ ", toReceive=" + toReceive +
+ ", progress=" + progress +
+ '}';
+ }
+ }
+ }
+
+ public static class SendingStatistics
+ {
+ public SendingHeader sendingHeader;
+ public List<SendingSSTable> sendingSSTable = new ArrayList<>();
+
+ public void parseHeader(String outputLine)
+ {
+ this.sendingHeader = SendingHeader.parseHeader(outputLine);
+ }
+
+ public void parseTable(String table)
+ {
+ sendingSSTable.add(SendingSSTable.parseTable(table));
+ }
+
+ public String toString()
+ {
+ return "SendingStatistics{" +
+ "sendingHeader=" + sendingHeader +
+ ", sendingSSTable=" + sendingSSTable +
+ '}';
+ }
+
+ public static class SendingHeader implements Comparable<SendingHeader>
+ {
+ private static final Pattern sendingHeaderPattern = Pattern.compile(
+ "Sending (.*) files, (.*) bytes total. Already sent (.*) files \\((.*)%\\), (.*) bytes total \\((.*)%\\)"
+ );
+
+ int totalSending = 0;
+ long bytesTotal = 0;
+ int alreadySent = 0;
+ double progressFiles = 0.0f;
+ long bytesTotalSoFar = 0;
+ double progressBytes = 0.0f;
+
+ public static SendingHeader parseHeader(String header)
+ {
+ final Matcher matcher = sendingHeaderPattern.matcher(header);
+
+ if (matcher.matches())
+ {
+ final SendingHeader sendingHeader = new SendingHeader();
+
+ sendingHeader.totalSending = Integer.parseInt(matcher.group(1));
+ sendingHeader.bytesTotal = Long.parseLong(matcher.group(2));
+ sendingHeader.alreadySent = Integer.parseInt(matcher.group(3));
+ sendingHeader.progressFiles = Double.parseDouble(matcher.group(4));
+ sendingHeader.bytesTotalSoFar = Long.parseLong(matcher.group(5));
+ sendingHeader.progressBytes = Double.parseDouble(matcher.group(6));
+
+ return sendingHeader;
+ }
+
+ throw new IllegalStateException("Header does not match - " + header);
+ }
+
+ public String toString()
+ {
+ return "SendingHeader{" +
+ "totalSending=" + totalSending +
+ ", bytesTotal=" + bytesTotal +
+ ", alreadySent=" + alreadySent +
+ ", progressFiles=" + progressFiles +
+ ", bytesTotalSoFar=" + bytesTotalSoFar +
+ ", progressBytes=" + progressBytes +
+ '}';
+ }
+
+
+ public int compareTo(SendingHeader o)
+ {
+ // progress on bytes has to be strictly lower,
+ // even alreadySent and progressFiles and progressBytes are same,
+ // bytesTotalSoFar has to be lower, bigger or same
+
+ if (alreadySent <= o.alreadySent
+ && progressFiles <= o.progressFiles
+ && bytesTotalSoFar <= o.bytesTotalSoFar
+ && progressBytes <= o.progressBytes)
+ {
+ return -1;
+ }
+ else if (alreadySent == o.alreadySent
+ && progressFiles == o.progressFiles
+ && bytesTotalSoFar == o.bytesTotalSoFar
+ && progressBytes == o.progressBytes)
+ {
+ return 0;
+ }
+ else if (alreadySent >= o.alreadySent
+ && progressFiles >= o.progressFiles
+ && bytesTotalSoFar > o.bytesTotalSoFar
+ && progressBytes >= o.progressBytes)
+ {
+ return 1;
+ }
+ else
+ {
+ throw new IllegalStateException(String.format("Could not compare arguments %s and %s", this, o));
+ }
+ }
+ }
+
+ public static class SendingSSTable
+ {
+ private static final Pattern sendingFilePattern = Pattern.compile("(.*) (.*)/(.*) bytes \\((.*)%\\) sent to (.*)");
+
+ long bytesSent = 0;
+ long bytesInTotal = 0;
+ double progress = 0.0f;
+
+ public static SendingSSTable parseTable(String table)
+ {
+ final Matcher matcher = sendingFilePattern.matcher(table);
+
+ if (matcher.matches())
+ {
+ final SendingSSTable sendingSSTable = new SendingSSTable();
+
+ sendingSSTable.bytesSent = Long.parseLong(matcher.group(2));
+ sendingSSTable.bytesInTotal = Long.parseLong(matcher.group(3));
+ sendingSSTable.progress = Double.parseDouble(matcher.group(4));
+
+ return sendingSSTable;
+ }
+
+ throw new IllegalStateException("Table does not match - " + table);
+ }
+
+ public String toString()
+ {
+ return "SendingSSTable{" +
+ "bytesSent=" + bytesSent +
+ ", bytesInTotal=" + bytesInTotal +
+ ", progress=" + progress +
+ '}';
+ }
+ }
+ }
+ }
+
+ protected static final class NetstatResults
+ {
+ private final List<NodeToolResultWithOutput> netstatOutputs = new ArrayList<>();
+
+ public void add(NodeToolResultWithOutput result)
+ {
+ netstatOutputs.add(result);
+ }
+
+ public void assertSuccessful()
+ {
+ for (final NodeToolResultWithOutput result : netstatOutputs)
+ {
+ Assert.assertEquals(result.getResult().getRc(), 0);
+ Assert.assertTrue(result.getStderr().isEmpty());
+ }
+ }
+ }
+
+ protected static class NetstatsCallable implements Callable<NetstatResults>
+ {
+ private final IInvokableInstance node;
+
+ public NetstatsCallable(final IInvokableInstance node)
+ {
+ this.node = node;
+ }
+
+ public NetstatResults call() throws Exception
+ {
+ final NetstatResults results = new NetstatResults();
+
+ boolean sawAnyStreamingOutput = false;
+
+ while (true)
+ {
+ try
+ {
+ final NodeToolResultWithOutput result = NodetoolUtils.nodetool(node, false, "netstats");
+
+ logger.info(node.broadcastAddress().toString() + " " + result.getStdout());
+
+ if (!sawAnyStreamingOutput)
+ {
+ if (result.getStdout().contains("Receiving") || result.getStdout().contains("Sending"))
+ {
+ sawAnyStreamingOutput = true;
+ }
+ }
+
+ if (sawAnyStreamingOutput && (!result.getStdout().contains("Receiving") && !result.getStdout().contains("Sending")))
+ {
+ break;
+ }
+
+ results.add(result);
+
+ Thread.currentThread().sleep(500);
+ }
+ catch (final Exception ex)
+ {
+ System.out.println(ex.getMessage());
+ Thread.currentThread().sleep(500);
+ }
+ }
+
+ return results;
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
new file mode 100644
index 0000000..7c53426
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+public class NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest extends AbstractNetstatsBootstrapStreaming
+{
+ @Test
+ public void testWithStreamingEntireSSTablesWithCompression() throws Exception
+ {
+ executeTest(true, true);
+ }
+
+ @Test
+ public void testWithStreamingEntireSSTablesWithoutCompression() throws Exception
+ {
+ executeTest(true, false);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithoutEntireSSTablesCompressionStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithoutEntireSSTablesCompressionStreamingTest.java
new file mode 100644
index 0000000..68b16c2
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithoutEntireSSTablesCompressionStreamingTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+public class NetstatsBootstrapWithoutEntireSSTablesCompressionStreamingTest extends AbstractNetstatsBootstrapStreaming
+{
+ @Test
+ public void testWithoutStreamingEntireSSTablesWithCompression() throws Exception
+ {
+ executeTest(false, true);
+ }
+
+ @Test
+ public void testWithoutStreamingEntireSSTablesWithoutCompression() throws Exception
+ {
+ executeTest(false, false);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java
new file mode 100644
index 0000000..5f74c77
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class NetstatsRepairStreamingTest extends AbstractNetstatsStreaming
+{
+ @Test
+ public void testWithCompressionEnabled() throws Exception
+ {
+ executeTest(true);
+ }
+
+ @Test
+ public void testWithCompressionDisabled() throws Exception
+ {
+ executeTest(false);
+ }
+
+ private void executeTest(boolean compressionEnabled) throws Exception
+ {
+ final ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+ try (final Cluster cluster = Cluster.build()
+ .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0"))
+ .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL)
+ .set("stream_throughput_outbound_megabits_per_sec", 1)
+ .set("compaction_throughput_mb_per_sec", 1)
+ .set("stream_entire_sstables", false)).start())
+ {
+ final IInvokableInstance node1 = cluster.get(1);
+ final IInvokableInstance node2 = cluster.get(2);
+
+ createTable(cluster, 1, compressionEnabled);
+
+ node1.nodetoolResult("disableautocompaction", "netstats_test").asserts().success();
+ node2.nodetoolResult("disableautocompaction", "netstats_test").asserts().success();
+
+ populateData(compressionEnabled);
+
+ node1.flush("netstats_test");
+ node2.flush("netstats_test");
+
+ //change RF from 1 to 2 so we need to repair it, repairing will causes streaming shown in netstats
+ changeReplicationFactor();
+
+ final Future<NetstatResults> resultsFuture1 = executorService.submit(new NetstatsCallable(node1));
+
+ node1.nodetoolResult("repair", "netstats_test").asserts().success();
+
+ final NetstatResults results = resultsFuture1.get(1, MINUTES);
+
+ results.assertSuccessful();
+
+ NetstatsOutputParser.validate(NetstatsOutputParser.parse(results));
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/util/NodetoolUtils.java b/test/distributed/org/apache/cassandra/distributed/util/NodetoolUtils.java
new file mode 100644
index 0000000..1bb6adf
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/util/NodetoolUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.impl.Instance;
+import org.apache.cassandra.distributed.shared.NodeToolResultWithOutput;
+
+public final class NodetoolUtils
+{
+ private NodetoolUtils()
+ {
+
+ }
+
+ public static NodeToolResultWithOutput nodetool(IInvokableInstance inst, String... args)
+ {
+ return nodetool(inst, true, args);
+ }
+
+ public static NodeToolResultWithOutput nodetool(IInvokableInstance inst, boolean withNotifications, String... args)
+ {
+ return inst.callOnInstance(() -> {
+ PrintStream originalSysOut = System.out;
+ PrintStream originalSysErr = System.err;
+ originalSysOut.flush();
+ originalSysErr.flush();
+ ByteArrayOutputStream toolOut = new ByteArrayOutputStream();
+ ByteArrayOutputStream toolErr = new ByteArrayOutputStream();
+
+ try (PrintStream newOut = new PrintStream(toolOut);
+ PrintStream newErr = new PrintStream(toolErr))
+ {
+ System.setOut(newOut);
+ System.setErr(newErr);
+ Instance.DTestNodeTool nodetool = new Instance.DTestNodeTool(withNotifications);
+ int rc = nodetool.execute(args);
+ NodeToolResult result = new NodeToolResult(args, rc, nodetool.getNotifications(), nodetool.getLatestError());
+ return new NodeToolResultWithOutput(result, toolOut, toolErr);
+ }
+ finally
+ {
+ System.setOut(originalSysOut);
+ System.setErr(originalSysErr);
+ }
+ });
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org