You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2023/01/10 20:57:35 UTC

[GitHub] [cassandra] jonmeredith commented on a diff in pull request #2066: Cassandra 18110

jonmeredith commented on code in PR #2066:
URL: https://github.com/apache/cassandra/pull/2066#discussion_r1065229104


##########
test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java:
##########
@@ -37,18 +38,21 @@ public class RebuildStreamingTest extends TestBaseImpl
     @Test
     public void test() throws IOException
     {
+        ByteBuffer blob = ByteBuffer.wrap(new byte[1 << 16]);
         try (Cluster cluster = init(Cluster.build(2)
                                            .withConfig(c -> c.with(Feature.values()).set("stream_entire_sstables", false))
                                            .start()))
         {

Review Comment:
   Can we make this a parametrized tests and check with entire sstable streaming active too? The only change you have to make is the expected number of files increases by 7x in the assertion as entire sstable streaming counts each component it sends. Alternately we could modify the accounting to only increment the file count on the DATA file?



##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -252,52 +239,55 @@ public synchronized void handleStreamEvent(StreamEvent event)
         {
             logger.warn("Unexpected exception handling stream event", t);
         }
-        sessions = Sessions.create(streamProgress.values());
         lastUpdatedAtNanos = Clock.Global.nanoTime();
     }
 
     private void streamPrepared(StreamEvent.SessionPreparedEvent event)
     {
-        SessionInfo session = new SessionInfo(event.session);
-        streamProgress.putIfAbsent(session.peer, session);
+        SessionInfo session = event.session;
+        peers.add(session.peer);
+        // only update stats on ACK to avoid duplication
+        if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+            return;
+        sessions.bytesToReceive += session.getTotalSizeToReceive();
+        sessions.bytesToSend += session.getTotalSizeToSend();
+
+        sessions.filesToReceive += session.getTotalFilesToReceive();
+        sessions.filesToSend += session.getTotalFilesToSend();
     }
 
     private void streamProgress(StreamEvent.ProgressEvent event)
     {
-        SessionInfo info = streamProgress.get(event.progress.peer);
-        if (info != null)
+        ProgressInfo info = event.progress;
+        Pair<InetAddressAndPort, String> key = Pair.create(info.peer, info.fileName);
+        long seenBytes = activeFiles.getOrDefault(key, 0);
+        long delta = info.currentBytes - seenBytes;
+        if (info.direction == ProgressInfo.Direction.IN)
         {
-            info.updateProgress(event.progress);
+            // receiving
+            sessions.bytesReceived += delta;
+            if (info.isCompleted())
+                sessions.filesReceived++;
         }
         else
         {
-            logger.warn("[Stream #{}} ID#{}] Recieved stream progress before prepare; peer={}", id, event.progress.sessionIndex, event.progress.peer);
+            // sending
+            sessions.bytesSent += delta;
+            if (info.isCompleted())
+                sessions.filesSent++;
         }
+        activeFiles.put(key, info.currentBytes);

Review Comment:
   Is it ok that `activeFiles` gradually grows to the total size of streams transferred and includes completed files once they are transferred. I suppose we need this if we want to check for non-negative deltas (or even better positive, but I'm not sure that's guaranteed).



##########
src/java/org/apache/cassandra/streaming/StreamResultFuture.java:
##########
@@ -219,6 +226,10 @@ synchronized void fireStreamEvent(StreamEvent event)
                 logger.warn("Unexpected exception in listern while calling handleStreamEvent", t);
             }
         }
+        long totalNanos = nanoTime() - startNanos;
+        if (totalNanos > slowEventsLogTimeoutNanos)
+            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, "Handling streaming events took longer than {}; took {}",

Review Comment:
   nit: include unit of nanos in the message?  Very low level nit as anybody investigating thing will be able to work it out from code.



##########
test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java:
##########
@@ -37,18 +38,21 @@ public class RebuildStreamingTest extends TestBaseImpl
     @Test
     public void test() throws IOException
     {
+        ByteBuffer blob = ByteBuffer.wrap(new byte[1 << 16]);
         try (Cluster cluster = init(Cluster.build(2)
                                            .withConfig(c -> c.with(Feature.values()).set("stream_entire_sstables", false))
                                            .start()))
         {

Review Comment:
   also, is it worth lowering `streaming_slow_events_timeout` so we check that messages get logged if slow?



##########
src/java/org/apache/cassandra/streaming/StreamingState.java:
##########
@@ -252,52 +239,55 @@ public synchronized void handleStreamEvent(StreamEvent event)
         {
             logger.warn("Unexpected exception handling stream event", t);
         }
-        sessions = Sessions.create(streamProgress.values());
         lastUpdatedAtNanos = Clock.Global.nanoTime();
     }
 
     private void streamPrepared(StreamEvent.SessionPreparedEvent event)
     {
-        SessionInfo session = new SessionInfo(event.session);
-        streamProgress.putIfAbsent(session.peer, session);
+        SessionInfo session = event.session;
+        peers.add(session.peer);
+        // only update stats on ACK to avoid duplication
+        if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+            return;
+        sessions.bytesToReceive += session.getTotalSizeToReceive();
+        sessions.bytesToSend += session.getTotalSizeToSend();
+
+        sessions.filesToReceive += session.getTotalFilesToReceive();
+        sessions.filesToSend += session.getTotalFilesToSend();
     }
 
     private void streamProgress(StreamEvent.ProgressEvent event)
     {
-        SessionInfo info = streamProgress.get(event.progress.peer);
-        if (info != null)
+        ProgressInfo info = event.progress;
+        Pair<InetAddressAndPort, String> key = Pair.create(info.peer, info.fileName);
+        long seenBytes = activeFiles.getOrDefault(key, 0);
+        long delta = info.currentBytes - seenBytes;

Review Comment:
   nit: is it worth a check for a non-negative delta?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org