You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/09/18 19:41:13 UTC

[cassandra] branch trunk updated: Show the progress of data streaming and index build

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b936d3e  Show the progress of data streaming and index build
b936d3e is described below

commit b936d3e95db3180a2f1f32ed7c80543e81971fde
Author: Stefan Miklosovic <st...@instaclustr.com>
AuthorDate: Fri Sep 18 11:40:19 2020 -0700

    Show the progress of data streaming and index build
    
    patch by Stefan Miklosovic; reviewed by Benjamin Lerer, Berenguer Blasi, David Capwell for CASSANDRA-15406
---
 CHANGES.txt                                        |   1 +
 .../streaming/CassandraCompressedStreamReader.java |   2 +-
 .../db/streaming/CassandraStreamReader.java        |   2 +-
 .../apache/cassandra/streaming/ProgressInfo.java   |   2 +-
 .../cassandra/streaming/StreamReceiveTask.java     |   4 +-
 .../apache/cassandra/tools/nodetool/NetStats.java  |  64 ++-
 .../cassandra/distributed/impl/Instance.java       |  14 +-
 .../shared/NodeToolResultWithOutput.java           |  48 ++
 .../test/AbstractNetstatsBootstrapStreaming.java   |  85 ++++
 .../test/AbstractNetstatsStreaming.java            | 548 +++++++++++++++++++++
 ...WithEntireSSTablesCompressionStreamingTest.java |  36 ++
 ...houtEntireSSTablesCompressionStreamingTest.java |  36 ++
 .../test/NetstatsRepairStreamingTest.java          |  88 ++++
 .../cassandra/distributed/util/NodetoolUtils.java  |  68 +++
 14 files changed, 978 insertions(+), 20 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 4916faa..42b7aa7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
  * Make Table/Keyspace Metric Names Consistent With Each Other (CASSANDRA-15909)
  * Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure (CASSANDRA-15861)
  * NPE thrown while updating speculative execution time if keyspace is removed during task execution (CASSANDRA-15949)
+ * Show the progress of data streaming and index build (CASSANDRA-15406)
 Merged from 3.11:
  * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
  * Make sure LCS handles duplicate sstable added/removed notifications correctly (CASSANDRA-14103)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
index 2491fe1..ff9e6f7 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -97,7 +97,7 @@ public class CassandraCompressedStreamReader extends CassandraStreamReader
                 {
                     writePartition(deserializer, writer);
                     // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
-                    session.progress(filename, ProgressInfo.Direction.IN, cis.chunkBytesRead(), totalSize);
+                    session.progress(filename + '-' + fileSeqNum, ProgressInfo.Direction.IN, cis.chunkBytesRead(), totalSize);
                 }
                 assert in.getBytesRead() == sectionLength;
             }
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 686d874..6835fad 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -124,7 +124,7 @@ public class CassandraStreamReader implements IStreamReader
             {
                 writePartition(deserializer, writer);
                 // TODO move this to BytesReadTracker
-                session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
+                session.progress(writer.getFilename() + '-' + fileSeqNum, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
             }
             logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
                          session.planId(), fileSeqNum, session.peer, FBUtilities.prettyPrintMemory(in.getBytesRead()), FBUtilities.prettyPrintMemory(totalSize));
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index ac91855..2b306f8 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -110,7 +110,7 @@ public class ProgressInfo implements Serializable
     {
         StringBuilder sb = new StringBuilder(fileName);
         sb.append(" ").append(currentBytes);
-        sb.append("/").append(totalBytes).append(" bytes");
+        sb.append("/").append(totalBytes).append(" bytes ");
         sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
         sb.append(direction == Direction.OUT ? "sent to " : "received from ");
         sb.append("idx:").append(sessionIndex);
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 25977a5..d127edb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -85,8 +85,8 @@ public class StreamReceiveTask extends StreamTask
         remoteStreamsReceived += stream.getNumFiles();
         bytesReceived += stream.getSize();
         Preconditions.checkArgument(tableId.equals(stream.getTableId()));
-        logger.debug("received {} of {} total files {} of total bytes {}", remoteStreamsReceived, totalStreams,
-                     bytesReceived, totalSize);
+        logger.debug("received {} of {} total files, {} of total bytes {}", remoteStreamsReceived, totalStreams,
+                     bytesReceived, stream.getSize());
 
         receiver.received(stream);
 
diff --git a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
index c0500ca..e86505b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
@@ -59,10 +59,29 @@ public class NetStats extends NodeToolCmd
                 System.out.printf("%n");
                 if (!info.receivingSummaries.isEmpty())
                 {
+                    long totalFilesToReceive = info.getTotalFilesToReceive();
+                    long totalBytesToReceive = info.getTotalSizeToReceive();
+                    long totalFilesReceived = info.getTotalFilesReceived();
+                    long totalSizeReceived = info.getTotalSizeReceived();
+                    double percentageFilesReceived = ((double) totalFilesReceived / totalFilesToReceive) * 100;
+                    double percentageSizesReceived = ((double) totalSizeReceived / totalBytesToReceive) * 100;
+
                     if (humanReadable)
-                        System.out.printf("        Receiving %d files, %s total. Already received %d files, %s total%n", info.getTotalFilesToReceive(), FileUtils.stringifyFileSize(info.getTotalSizeToReceive()), info.getTotalFilesReceived(), FileUtils.stringifyFileSize(info.getTotalSizeReceived()));
+                        System.out.printf("        Receiving %d files, %s total. Already received %d files (%.2f%%), %s total (%.2f%%)%n",
+                                          totalFilesToReceive,
+                                          FileUtils.stringifyFileSize(totalBytesToReceive),
+                                          totalFilesReceived,
+                                          percentageFilesReceived,
+                                          FileUtils.stringifyFileSize(totalSizeReceived),
+                                          percentageSizesReceived);
                     else
-                        System.out.printf("        Receiving %d files, %d bytes total. Already received %d files, %d bytes total%n", info.getTotalFilesToReceive(), info.getTotalSizeToReceive(), info.getTotalFilesReceived(), info.getTotalSizeReceived());
+                        System.out.printf("        Receiving %d files, %d bytes total. Already received %d files (%.2f%%), %d bytes total (%.2f%%)%n",
+                                          totalFilesToReceive,
+                                          totalBytesToReceive,
+                                          totalFilesReceived,
+                                          percentageFilesReceived,
+                                          totalSizeReceived,
+                                          percentageSizesReceived);
                     for (ProgressInfo progress : info.getReceivingFiles())
                     {
                         System.out.printf("            %s%n", progress.toString(printPort));
@@ -70,10 +89,29 @@ public class NetStats extends NodeToolCmd
                 }
                 if (!info.sendingSummaries.isEmpty())
                 {
+                    long totalFilesToSend = info.getTotalFilesToSend();
+                    long totalSizeToSend = info.getTotalSizeToSend();
+                    long totalFilesSent = info.getTotalFilesSent();
+                    long totalSizeSent = info.getTotalSizeSent();
+                    double percentageFilesSent = ((double) totalFilesSent / totalFilesToSend) * 100;
+                    double percentageSizeSent = ((double) totalSizeSent / totalSizeToSend) * 100;
+
                     if (humanReadable)
-                        System.out.printf("        Sending %d files, %s total. Already sent %d files, %s total%n", info.getTotalFilesToSend(), FileUtils.stringifyFileSize(info.getTotalSizeToSend()), info.getTotalFilesSent(), FileUtils.stringifyFileSize(info.getTotalSizeSent()));
+                        System.out.printf("        Sending %d files, %s total. Already sent %d files (%.2f%%), %s total (%.2f%%)%n",
+                                          totalFilesToSend,
+                                          FileUtils.stringifyFileSize(totalSizeToSend),
+                                          totalFilesSent,
+                                          percentageFilesSent,
+                                          FileUtils.stringifyFileSize(totalSizeSent),
+                                          percentageSizeSent);
                     else
-                        System.out.printf("        Sending %d files, %d bytes total. Already sent %d files, %d bytes total%n", info.getTotalFilesToSend(), info.getTotalSizeToSend(), info.getTotalFilesSent(), info.getTotalSizeSent());
+                        System.out.printf("        Sending %d files, %d bytes total. Already sent %d files (%.2f%%), %d bytes total (%.2f%%) %n",
+                                          totalFilesToSend,
+                                          totalSizeToSend,
+                                          totalFilesSent,
+                                          percentageFilesSent,
+                                          totalSizeSent,
+                                          percentageSizeSent);
                     for (ProgressInfo progress : info.getSendingFiles())
                     {
                         System.out.printf("            %s%n", progress.toString(printPort));
@@ -98,35 +136,35 @@ public class NetStats extends NodeToolCmd
             long dropped;
 
             pending = 0;
-            for (int n : ms.getLargeMessagePendingTasks().values())
+            for (int n : ms.getLargeMessagePendingTasksWithPort().values())
                 pending += n;
             completed = 0;
-            for (long n : ms.getLargeMessageCompletedTasks().values())
+            for (long n : ms.getLargeMessageCompletedTasksWithPort().values())
                 completed += n;
             dropped = 0;
-            for (long n : ms.getLargeMessageDroppedTasks().values())
+            for (long n : ms.getLargeMessageDroppedTasksWithPort().values())
                 dropped += n;
             System.out.printf("%-25s%10s%10s%15s%10s%n", "Large messages", "n/a", pending, completed, dropped);
 
             pending = 0;
-            for (int n : ms.getSmallMessagePendingTasks().values())
+            for (int n : ms.getSmallMessagePendingTasksWithPort().values())
                 pending += n;
             completed = 0;
-            for (long n : ms.getSmallMessageCompletedTasks().values())
+            for (long n : ms.getSmallMessageCompletedTasksWithPort().values())
                 completed += n;
             dropped = 0;
-            for (long n : ms.getSmallMessageDroppedTasks().values())
+            for (long n : ms.getSmallMessageDroppedTasksWithPort().values())
                 dropped += n;
             System.out.printf("%-25s%10s%10s%15s%10s%n", "Small messages", "n/a", pending, completed, dropped);
 
             pending = 0;
-            for (int n : ms.getGossipMessagePendingTasks().values())
+            for (int n : ms.getGossipMessagePendingTasksWithPort().values())
                 pending += n;
             completed = 0;
-            for (long n : ms.getGossipMessageCompletedTasks().values())
+            for (long n : ms.getGossipMessageCompletedTasksWithPort().values())
                 completed += n;
             dropped = 0;
-            for (long n : ms.getGossipMessageDroppedTasks().values())
+            for (long n : ms.getGossipMessageDroppedTasksWithPort().values())
                 dropped += n;
             System.out.printf("%-25s%10s%10s%15s%10s%n", "Gossip messages", "n/a", pending, completed, dropped);
         }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 5395cb8..038698b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -605,18 +605,28 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         }).call();
     }
 
-    private static class DTestNodeTool extends NodeTool {
+    public static class DTestNodeTool extends NodeTool {
         private final StorageServiceMBean storageProxy;
         private final CollectingNotificationListener notifications = new CollectingNotificationListener();
 
         private Throwable latestError;
 
-        DTestNodeTool(boolean withNotifications) {
+        public DTestNodeTool(boolean withNotifications) {
             super(new InternalNodeProbeFactory(withNotifications));
             storageProxy = new InternalNodeProbe(withNotifications).getStorageService();
             storageProxy.addNotificationListener(notifications, null, null);
         }
 
+        public List<Notification> getNotifications()
+        {
+            return new ArrayList<>(notifications.notifications);
+        }
+
+        public Throwable getLatestError()
+        {
+            return latestError;
+        }
+
         public int execute(String... args)
         {
             try
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/NodeToolResultWithOutput.java b/test/distributed/org/apache/cassandra/distributed/shared/NodeToolResultWithOutput.java
new file mode 100644
index 0000000..cb94887
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/shared/NodeToolResultWithOutput.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.cassandra.distributed.api.NodeToolResult;
+
+public class NodeToolResultWithOutput
+{
+    private final NodeToolResult result;
+    private final ByteArrayOutputStream stdout;
+    private final ByteArrayOutputStream stderr;
+
+    public NodeToolResultWithOutput(NodeToolResult result, ByteArrayOutputStream stdout, ByteArrayOutputStream stderr) {
+        this.result = result;
+        this.stdout = stdout;
+        this.stderr = stderr;
+    }
+
+    public NodeToolResult getResult() {
+        return this.result;
+    }
+
+    public String getStdout() {
+        return this.stdout.toString();
+    }
+
+    public String getStderr() {
+        return this.stderr.toString();
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
new file mode 100644
index 0000000..7aca7bd
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public abstract class AbstractNetstatsBootstrapStreaming extends AbstractNetstatsStreaming
+{
+    protected void executeTest(final boolean streamEntireSSTables,
+                               final boolean compressionEnabled) throws Exception
+    {
+        final Cluster.Builder builder = builder().withNodes(1)
+                                                 .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(2))
+                                                 .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0"))
+                                                 .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL)
+                                                                             .set("stream_throughput_outbound_megabits_per_sec", 1)
+                                                                             .set("compaction_throughput_mb_per_sec", 1)
+                                                                             .set("stream_entire_sstables", streamEntireSSTables));
+
+        try (final Cluster cluster = builder.withNodes(1).start())
+        {
+            // populate data only against 1 node first
+
+            createTable(cluster, 1, compressionEnabled);
+
+            cluster.get(1).nodetoolResult("disableautocompaction", "netstats_test").asserts().success();
+
+            if (compressionEnabled)
+            {
+                populateData(true);
+            }
+            else
+            {
+                populateData(false);
+            }
+
+            cluster.get(1).flush("netstats_test");
+
+            // then bootstrap the second one, upon joining,
+            // we should see that netstats shows how SSTables are being streamed on the first node
+
+            final IInstanceConfig config = cluster.newInstanceConfig();
+            config.set("auto_bootstrap", true);
+
+            IInvokableInstance secondNode = cluster.bootstrap(config);
+
+            final Future<?> startupRunnable = executorService.submit((Runnable) secondNode::startup);
+            final Future<AbstractNetstatsStreaming.NetstatResults> netstatsFuture = executorService.submit(new NetstatsCallable(cluster.get(1)));
+
+            final AbstractNetstatsStreaming.NetstatResults results = netstatsFuture.get(1, MINUTES);
+            startupRunnable.get(2, MINUTES);
+
+            results.assertSuccessful();
+
+            AbstractNetstatsStreaming.NetstatsOutputParser.validate(AbstractNetstatsStreaming.NetstatsOutputParser.parse(results));
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsStreaming.java b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsStreaming.java
new file mode 100644
index 0000000..85e3e23
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsStreaming.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.NodeToolResultWithOutput;
+import org.apache.cassandra.distributed.util.NodetoolUtils;
+import org.apache.cassandra.utils.Pair;
+
+import static java.util.stream.Collectors.toList;
+
+public abstract class AbstractNetstatsStreaming extends TestBaseImpl
+{
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractNetstatsStreaming.class);
+
+    protected ExecutorService executorService;
+
+    @Before
+    public void setup()
+    {
+        executorService = Executors.newCachedThreadPool();
+    }
+
+    @After
+    public void teardown() throws Exception
+    {
+        try
+        {
+            executorService.shutdownNow();
+
+            if (!executorService.isShutdown())
+            {
+                if (!executorService.awaitTermination(1, TimeUnit.MINUTES))
+                {
+                    throw new IllegalStateException("Unable to shutdown executor for invoking netstat commands.");
+                }
+            }
+        }
+        finally
+        {
+            executorService = null;
+        }
+    }
+
+    protected void changeReplicationFactor()
+    {
+        try (com.datastax.driver.core.Cluster c = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+             Session s = c.connect())
+        {
+            s.execute("ALTER KEYSPACE netstats_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };");
+        }
+    }
+
+    protected void createTable(Cluster cluster, int replicationFactor, boolean compressionEnabled)
+    {
+        // replication factor is 1
+        cluster.schemaChange("CREATE KEYSPACE netstats_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
+
+        if (compressionEnabled)
+        {
+            cluster.schemaChange("CREATE TABLE netstats_test.test_table (id uuid primary key) WITH compression = {'enabled':'true', 'class': 'LZ4Compressor'};");
+        }
+        else
+        {
+            cluster.schemaChange("CREATE TABLE netstats_test.test_table (id uuid primary key) WITH compression = {'enabled':'false'};");
+        }
+    }
+
+    protected void populateData(boolean forCompressedTest)
+    {
+        try (com.datastax.driver.core.Cluster c = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+             Session s = c.connect("netstats_test"))
+        {
+            int records = forCompressedTest ? 100_000 : 70_000;
+
+            for (int i = 0; i < records; i++)
+            {
+                s.execute("INSERT INTO test_table (id) VALUES (" + UUID.randomUUID() + ")");
+            }
+        }
+    }
+
+    protected static class NetstatsOutputParser
+    {
+        public static List<Pair<ReceivingStastistics, SendingStatistics>> parse(final NetstatResults results)
+        {
+            final Set<String> outputs = new LinkedHashSet<>();
+
+            results.netstatOutputs.stream()
+                                  .map(NodeToolResultWithOutput::getStdout)
+                                  .filter(output -> !output.contains("Not sending any streams"))
+                                  .filter(output -> output.contains("Receiving") || output.contains("Sending"))
+                                  .forEach(outputs::add);
+
+            final List<Pair<ReceivingStastistics, SendingStatistics>> parsed = new ArrayList<>();
+
+            for (final String output : outputs)
+            {
+                boolean processingReceiving = false;
+                boolean processingSending = false;
+
+                final ReceivingStastistics receivingStastistics = new ReceivingStastistics();
+                final SendingStatistics sendingStatistics = new SendingStatistics();
+
+                final List<String> sanitisedOutput = Stream.of(output.split("\n"))
+                                                           .map(String::trim)
+                                                           .filter(line -> !line.isEmpty())
+                                                           // sometimes logs are mangled into output
+                                                           .filter(line -> Stream.of("DEBUG", "INFO", "ERROR", "WARN").noneMatch(line::contains))
+                                                           .filter(line -> Stream.of("Mode:", "Read", "Attempted", "Mismatch", "Pool", "Large", "Small", "Gossip").noneMatch(line::startsWith))
+                                                           .collect(toList());
+
+                for (final String outputLine : sanitisedOutput)
+                {
+                    if (outputLine.startsWith("Receiving"))
+                    {
+                        processingReceiving = true;
+                        processingSending = false;
+
+                        receivingStastistics.parseHeader(outputLine);
+                    }
+                    else if (outputLine.startsWith("Sending"))
+                    {
+                        processingSending = true;
+                        processingReceiving = false;
+
+                        sendingStatistics.parseHeader(outputLine);
+                    }
+                    else if (processingReceiving)
+                    {
+                        receivingStastistics.parseTable(outputLine);
+                    }
+                    else if (processingSending)
+                    {
+                        sendingStatistics.parseTable(outputLine);
+                    }
+                }
+
+                parsed.add(Pair.create(receivingStastistics, sendingStatistics));
+            }
+
+            return parsed;
+        }
+
+        public static void validate(List<Pair<ReceivingStastistics, SendingStatistics>> result)
+        {
+            List<SendingStatistics> sendingStatistics = result.stream().map(pair -> pair.right).collect(toList());
+
+            if (sendingStatistics.size() >= 2)
+            {
+                for (int i = 0; i < sendingStatistics.size() - 1; i++)
+                {
+                    SendingStatistics.SendingHeader header1 = sendingStatistics.get(i).sendingHeader;
+                    SendingStatistics.SendingHeader header2 = sendingStatistics.get(i + 1).sendingHeader;
+
+                    if (header1 != null && header2 != null)
+                    {
+                        Assert.assertTrue(header1.compareTo(header2) <= 0);
+                    }
+                }
+            }
+
+            for (SendingStatistics sending : sendingStatistics)
+            {
+                if (sending.sendingHeader != null)
+                {
+                    Assert.assertEquals(sending.sendingHeader.bytesTotalSoFar, (long) sending.sendingSSTable.stream().map(table -> table.bytesSent).reduce(Long::sum).orElseGet(() -> 0L));
+                    Assert.assertTrue(sending.sendingHeader.bytesTotal >= sending.sendingSSTable.stream().map(table -> table.bytesInTotal).reduce(Long::sum).orElseGet(() -> 0L));
+
+                    if (sending.sendingHeader.bytesTotalSoFar != 0)
+                    {
+                        double progress = (double) sending.sendingSSTable.stream().map(table -> table.bytesSent).reduce(Long::sum).orElse(0L) / (double) sending.sendingHeader.bytesTotal;
+
+                        Assert.assertTrue((int) sending.sendingHeader.progressBytes >= (int) (progress * 100));
+
+                        Assert.assertTrue((double) sending.sendingHeader.bytesTotal >= (double) sending.sendingSSTable.stream().map(table -> table.bytesInTotal).reduce(Long::sum).orElse(0L));
+                    }
+                }
+            }
+
+            List<ReceivingStastistics> receivingStastistics = result.stream().map(pair -> pair.left).collect(toList());
+
+            for (ReceivingStastistics receiving : receivingStastistics)
+            {
+                if (receiving.receivingHeader != null)
+                {
+                    Assert.assertTrue(receiving.receivingHeader.bytesTotal >= receiving.receivingTables.stream().map(table -> table.receivedSoFar).reduce(Long::sum).orElse(0L));
+                    Assert.assertEquals(receiving.receivingHeader.bytesTotalSoFar, (long) receiving.receivingTables.stream().map(table -> table.receivedSoFar).reduce(Long::sum).orElse(0L));
+                }
+            }
+        }
+
+        public static class ReceivingStastistics
+        {
+            public ReceivingHeader receivingHeader;
+            public List<ReceivingTable> receivingTables = new ArrayList<>();
+
+            public void parseHeader(String header)
+            {
+                receivingHeader = ReceivingHeader.parseHeader(header);
+            }
+
+            public void parseTable(String table)
+            {
+                receivingTables.add(ReceivingTable.parseTable(table));
+            }
+
+            public String toString()
+            {
+                return "ReceivingStastistics{" +
+                       "receivingHeader=" + receivingHeader +
+                       ", receivingTables=" + receivingTables +
+                       '}';
+            }
+
+            public static class ReceivingHeader
+            {
+                private static final Pattern receivingHeaderPattern = Pattern.compile(
+                "Receiving (.*) files, (.*) bytes total. Already received (.*) files \\((.*)%\\), (.*) bytes total \\((.*)%\\)"
+                );
+
+                int totalReceiving = 0;
+                long bytesTotal = 0;
+                int alreadyReceived = 0;
+                double progressFiles = 0.0f;
+                long bytesTotalSoFar = 0;
+                double progressBytes = 0.0f;
+
+                public static ReceivingHeader parseHeader(String header)
+                {
+                    final Matcher matcher = receivingHeaderPattern.matcher(header);
+
+                    if (matcher.matches())
+                    {
+                        final ReceivingHeader receivingHeader = new ReceivingHeader();
+
+                        receivingHeader.totalReceiving = Integer.parseInt(matcher.group(1));
+                        receivingHeader.bytesTotal = Long.parseLong(matcher.group(2));
+                        receivingHeader.alreadyReceived = Integer.parseInt(matcher.group(3));
+                        receivingHeader.progressFiles = Double.parseDouble(matcher.group(4));
+                        receivingHeader.bytesTotalSoFar = Long.parseLong(matcher.group(5));
+                        receivingHeader.progressBytes = Double.parseDouble(matcher.group(6));
+
+                        return receivingHeader;
+                    }
+
+                    throw new IllegalStateException("Header does not match - " + header);
+                }
+
+                public String toString()
+                {
+                    return "ReceivingHeader{" +
+                           "totalReceiving=" + totalReceiving +
+                           ", bytesTotal=" + bytesTotal +
+                           ", alreadyReceived=" + alreadyReceived +
+                           ", progressFiles=" + progressFiles +
+                           ", bytesTotalSoFar=" + bytesTotalSoFar +
+                           ", progressBytes=" + progressBytes +
+                           '}';
+                }
+            }
+
+            public static class ReceivingTable
+            {
+                long receivedSoFar = 0;
+                long toReceive = 0;
+                double progress = 0.0;
+
+                private static final Pattern recievingFilePattern = Pattern.compile("(.*) (.*)/(.*) bytes \\((.*)%\\) received from (.*)");
+
+                public static ReceivingTable parseTable(String table)
+                {
+                    final Matcher matcher = recievingFilePattern.matcher(table);
+
+                    if (matcher.matches())
+                    {
+                        final ReceivingTable receivingTable = new ReceivingTable();
+
+                        receivingTable.receivedSoFar = Long.parseLong(matcher.group(2));
+                        receivingTable.toReceive = Long.parseLong(matcher.group(3));
+                        receivingTable.progress = Double.parseDouble(matcher.group(4));
+
+                        return receivingTable;
+                    }
+
+                    throw new IllegalStateException("Table line does not match - " + table);
+                }
+
+                public String toString()
+                {
+                    return "ReceivingTable{" +
+                           "receivedSoFar=" + receivedSoFar +
+                           ", toReceive=" + toReceive +
+                           ", progress=" + progress +
+                           '}';
+                }
+            }
+        }
+
+        public static class SendingStatistics
+        {
+            public SendingHeader sendingHeader;
+            public List<SendingSSTable> sendingSSTable = new ArrayList<>();
+
+            public void parseHeader(String outputLine)
+            {
+                this.sendingHeader = SendingHeader.parseHeader(outputLine);
+            }
+
+            public void parseTable(String table)
+            {
+                sendingSSTable.add(SendingSSTable.parseTable(table));
+            }
+
+            public String toString()
+            {
+                return "SendingStatistics{" +
+                       "sendingHeader=" + sendingHeader +
+                       ", sendingSSTable=" + sendingSSTable +
+                       '}';
+            }
+
+            public static class SendingHeader implements Comparable<SendingHeader>
+            {
+                private static final Pattern sendingHeaderPattern = Pattern.compile(
+                "Sending (.*) files, (.*) bytes total. Already sent (.*) files \\((.*)%\\), (.*) bytes total \\((.*)%\\)"
+                );
+
+                int totalSending = 0;
+                long bytesTotal = 0;
+                int alreadySent = 0;
+                double progressFiles = 0.0f;
+                long bytesTotalSoFar = 0;
+                double progressBytes = 0.0f;
+
+                public static SendingHeader parseHeader(String header)
+                {
+                    final Matcher matcher = sendingHeaderPattern.matcher(header);
+
+                    if (matcher.matches())
+                    {
+                        final SendingHeader sendingHeader = new SendingHeader();
+
+                        sendingHeader.totalSending = Integer.parseInt(matcher.group(1));
+                        sendingHeader.bytesTotal = Long.parseLong(matcher.group(2));
+                        sendingHeader.alreadySent = Integer.parseInt(matcher.group(3));
+                        sendingHeader.progressFiles = Double.parseDouble(matcher.group(4));
+                        sendingHeader.bytesTotalSoFar = Long.parseLong(matcher.group(5));
+                        sendingHeader.progressBytes = Double.parseDouble(matcher.group(6));
+
+                        return sendingHeader;
+                    }
+
+                    throw new IllegalStateException("Header does not match - " + header);
+                }
+
+                public String toString()
+                {
+                    return "SendingHeader{" +
+                           "totalSending=" + totalSending +
+                           ", bytesTotal=" + bytesTotal +
+                           ", alreadySent=" + alreadySent +
+                           ", progressFiles=" + progressFiles +
+                           ", bytesTotalSoFar=" + bytesTotalSoFar +
+                           ", progressBytes=" + progressBytes +
+                           '}';
+                }
+
+
+                public int compareTo(SendingHeader o)
+                {
+                    // progress on bytes has to be strictly lower,
+                    // even alreadySent and progressFiles and progressBytes are same,
+                    // bytesTotalSoFar has to be lower, bigger or same
+
+                    if (alreadySent <= o.alreadySent
+                        && progressFiles <= o.progressFiles
+                        && bytesTotalSoFar <= o.bytesTotalSoFar
+                        && progressBytes <= o.progressBytes)
+                    {
+                        return -1;
+                    }
+                    else if (alreadySent == o.alreadySent
+                             && progressFiles == o.progressFiles
+                             && bytesTotalSoFar == o.bytesTotalSoFar
+                             && progressBytes == o.progressBytes)
+                    {
+                        return 0;
+                    }
+                    else if (alreadySent >= o.alreadySent
+                             && progressFiles >= o.progressFiles
+                             && bytesTotalSoFar > o.bytesTotalSoFar
+                             && progressBytes >= o.progressBytes)
+                    {
+                        return 1;
+                    }
+                    else
+                    {
+                        throw new IllegalStateException(String.format("Could not compare arguments %s and %s", this, o));
+                    }
+                }
+            }
+
+            public static class SendingSSTable
+            {
+                private static final Pattern sendingFilePattern = Pattern.compile("(.*) (.*)/(.*) bytes \\((.*)%\\) sent to (.*)");
+
+                long bytesSent = 0;
+                long bytesInTotal = 0;
+                double progress = 0.0f;
+
+                public static SendingSSTable parseTable(String table)
+                {
+                    final Matcher matcher = sendingFilePattern.matcher(table);
+
+                    if (matcher.matches())
+                    {
+                        final SendingSSTable sendingSSTable = new SendingSSTable();
+
+                        sendingSSTable.bytesSent = Long.parseLong(matcher.group(2));
+                        sendingSSTable.bytesInTotal = Long.parseLong(matcher.group(3));
+                        sendingSSTable.progress = Double.parseDouble(matcher.group(4));
+
+                        return sendingSSTable;
+                    }
+
+                    throw new IllegalStateException("Table does not match - " + table);
+                }
+
+                public String toString()
+                {
+                    return "SendingSSTable{" +
+                           "bytesSent=" + bytesSent +
+                           ", bytesInTotal=" + bytesInTotal +
+                           ", progress=" + progress +
+                           '}';
+                }
+            }
+        }
+    }
+
+    protected static final class NetstatResults
+    {
+        private final List<NodeToolResultWithOutput> netstatOutputs = new ArrayList<>();
+
+        public void add(NodeToolResultWithOutput result)
+        {
+            netstatOutputs.add(result);
+        }
+
+        public void assertSuccessful()
+        {
+            for (final NodeToolResultWithOutput result : netstatOutputs)
+            {
+                Assert.assertEquals(result.getResult().getRc(), 0);
+                Assert.assertTrue(result.getStderr().isEmpty());
+            }
+        }
+    }
+
+    protected static class NetstatsCallable implements Callable<NetstatResults>
+    {
+        private final IInvokableInstance node;
+
+        public NetstatsCallable(final IInvokableInstance node)
+        {
+            this.node = node;
+        }
+
+        public NetstatResults call() throws Exception
+        {
+            final NetstatResults results = new NetstatResults();
+
+            boolean sawAnyStreamingOutput = false;
+
+            while (true)
+            {
+                try
+                {
+                    final NodeToolResultWithOutput result = NodetoolUtils.nodetool(node, false, "netstats");
+
+                    logger.info(node.broadcastAddress().toString() + " " + result.getStdout());
+
+                    if (!sawAnyStreamingOutput)
+                    {
+                        if (result.getStdout().contains("Receiving") || result.getStdout().contains("Sending"))
+                        {
+                            sawAnyStreamingOutput = true;
+                        }
+                    }
+
+                    if (sawAnyStreamingOutput && (!result.getStdout().contains("Receiving") && !result.getStdout().contains("Sending")))
+                    {
+                        break;
+                    }
+
+                    results.add(result);
+
+                    Thread.currentThread().sleep(500);
+                }
+                catch (final Exception ex)
+                {
+                    System.out.println(ex.getMessage());
+                    Thread.currentThread().sleep(500);
+                }
+            }
+
+            return results;
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
new file mode 100644
index 0000000..7c53426
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+public class NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest extends AbstractNetstatsBootstrapStreaming
+{
+    @Test
+    public void testWithStreamingEntireSSTablesWithCompression() throws Exception
+    {
+        executeTest(true, true);
+    }
+
+    @Test
+    public void testWithStreamingEntireSSTablesWithoutCompression() throws Exception
+    {
+        executeTest(true, false);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithoutEntireSSTablesCompressionStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithoutEntireSSTablesCompressionStreamingTest.java
new file mode 100644
index 0000000..68b16c2
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithoutEntireSSTablesCompressionStreamingTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+public class NetstatsBootstrapWithoutEntireSSTablesCompressionStreamingTest extends AbstractNetstatsBootstrapStreaming
+{
+    @Test
+    public void testWithoutStreamingEntireSSTablesWithCompression() throws Exception
+    {
+        executeTest(false, true);
+    }
+
+    @Test
+    public void testWithoutStreamingEntireSSTablesWithoutCompression() throws Exception
+    {
+        executeTest(false, false);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java
new file mode 100644
index 0000000..5f74c77
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class NetstatsRepairStreamingTest extends AbstractNetstatsStreaming
+{
+    @Test
+    public void testWithCompressionEnabled() throws Exception
+    {
+        executeTest(true);
+    }
+
+    @Test
+    public void testWithCompressionDisabled() throws Exception
+    {
+        executeTest(false);
+    }
+
+    private void executeTest(boolean compressionEnabled) throws Exception
+    {
+        final ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+        try (final Cluster cluster = Cluster.build()
+                                            .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0"))
+                                            .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL)
+                                                                        .set("stream_throughput_outbound_megabits_per_sec", 1)
+                                                                        .set("compaction_throughput_mb_per_sec", 1)
+                                                                        .set("stream_entire_sstables", false)).start())
+        {
+            final IInvokableInstance node1 = cluster.get(1);
+            final IInvokableInstance node2 = cluster.get(2);
+
+            createTable(cluster, 1, compressionEnabled);
+
+            node1.nodetoolResult("disableautocompaction", "netstats_test").asserts().success();
+            node2.nodetoolResult("disableautocompaction", "netstats_test").asserts().success();
+
+            populateData(compressionEnabled);
+
+            node1.flush("netstats_test");
+            node2.flush("netstats_test");
+
+            //change RF from 1 to 2 so we need to repair it, repairing will causes streaming shown in netstats
+            changeReplicationFactor();
+
+            final Future<NetstatResults> resultsFuture1 = executorService.submit(new NetstatsCallable(node1));
+
+            node1.nodetoolResult("repair", "netstats_test").asserts().success();
+
+            final NetstatResults results = resultsFuture1.get(1, MINUTES);
+
+            results.assertSuccessful();
+
+            NetstatsOutputParser.validate(NetstatsOutputParser.parse(results));
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/util/NodetoolUtils.java b/test/distributed/org/apache/cassandra/distributed/util/NodetoolUtils.java
new file mode 100644
index 0000000..1bb6adf
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/util/NodetoolUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.impl.Instance;
+import org.apache.cassandra.distributed.shared.NodeToolResultWithOutput;
+
+public final class NodetoolUtils
+{
+    private NodetoolUtils()
+    {
+
+    }
+
+    public static NodeToolResultWithOutput nodetool(IInvokableInstance inst, String... args)
+    {
+        return nodetool(inst, true, args);
+    }
+
+    public static NodeToolResultWithOutput nodetool(IInvokableInstance inst, boolean withNotifications, String... args)
+    {
+        return inst.callOnInstance(() -> {
+            PrintStream originalSysOut = System.out;
+            PrintStream originalSysErr = System.err;
+            originalSysOut.flush();
+            originalSysErr.flush();
+            ByteArrayOutputStream toolOut = new ByteArrayOutputStream();
+            ByteArrayOutputStream toolErr = new ByteArrayOutputStream();
+
+            try (PrintStream newOut = new PrintStream(toolOut);
+                 PrintStream newErr = new PrintStream(toolErr))
+            {
+                System.setOut(newOut);
+                System.setErr(newErr);
+                Instance.DTestNodeTool nodetool = new Instance.DTestNodeTool(withNotifications);
+                int rc = nodetool.execute(args);
+                NodeToolResult result = new NodeToolResult(args, rc, nodetool.getNotifications(), nodetool.getLatestError());
+                return new NodeToolResultWithOutput(result, toolOut, toolErr);
+            }
+            finally
+            {
+                System.setOut(originalSysOut);
+                System.setErr(originalSysErr);
+            }
+        });
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org