You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/05/26 22:33:55 UTC

[GitHub] [cassandra] jonmeredith commented on a diff in pull request #1648: CASSANDRA-17663 Ensure FileStreamTask cannot compromise shared channel proxy for system table when interrupted

jonmeredith commented on code in PR #1648:
URL: https://github.com/apache/cassandra/pull/1648#discussion_r883120660


##########
src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java:
##########
@@ -155,21 +155,29 @@ private void setupControlMessageChannel() throws IOException
              *  b) for streaming receiver (note: both initiator and follower can receive streaming files) to reveive files,
              *     in {@link Handler#setupStreamingPipeline}
              */
-            controlChannel = createChannel(StreamingChannel.Kind.CONTROL);
+            controlChannel = createControlChannel();
         }
     }
 
-    private StreamingChannel createChannel(StreamingChannel.Kind kind) throws IOException
+    private StreamingChannel createControlChannel() throws IOException
     {
         logger.debug("Creating stream session to {} as {}", to, session.isFollower() ? "follower" : "initiator");
 
-        StreamingChannel channel = factory.create(to, messagingVersion, kind);
-        if (kind == StreamingChannel.Kind.CONTROL)
-        {
-            executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", to.toString(), channel.id()),
-                                          new StreamDeserializingTask(session, channel, messagingVersion));
-            session.attachInbound(channel);
-        }
+        StreamingChannel channel = factory.create(to, messagingVersion, StreamingChannel.Kind.CONTROL);
+        executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", to.toString(), channel.id()),
+                                      new StreamDeserializingTask(session, channel, messagingVersion));
+        session.attachInbound(channel);
+        session.attachOutbound(channel);
+
+        logger.debug("Creating {}", channel.description());
+        return channel;
+    }
+    
+    private StreamingChannel createFileChannel(InetAddressAndPort connectTo) throws IOException
+    {
+        logger.debug("Creating stream session to {} as {}", to, session.isFollower() ? "follower" : "initiator");
+
+        StreamingChannel channel = factory.create(to, connectTo, messagingVersion, StreamingChannel.Kind.FILE);
         session.attachOutbound(channel);
 
         logger.debug("Creating {}", channel.description());

Review Comment:
   Similarly `Creating file {}`



##########
src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java:
##########
@@ -155,21 +155,29 @@ private void setupControlMessageChannel() throws IOException
              *  b) for streaming receiver (note: both initiator and follower can receive streaming files) to reveive files,
              *     in {@link Handler#setupStreamingPipeline}
              */
-            controlChannel = createChannel(StreamingChannel.Kind.CONTROL);
+            controlChannel = createControlChannel();
         }
     }
 
-    private StreamingChannel createChannel(StreamingChannel.Kind kind) throws IOException
+    private StreamingChannel createControlChannel() throws IOException
     {
         logger.debug("Creating stream session to {} as {}", to, session.isFollower() ? "follower" : "initiator");
 
-        StreamingChannel channel = factory.create(to, messagingVersion, kind);
-        if (kind == StreamingChannel.Kind.CONTROL)
-        {
-            executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", to.toString(), channel.id()),
-                                          new StreamDeserializingTask(session, channel, messagingVersion));
-            session.attachInbound(channel);
-        }
+        StreamingChannel channel = factory.create(to, messagingVersion, StreamingChannel.Kind.CONTROL);
+        executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", to.toString(), channel.id()),
+                                      new StreamDeserializingTask(session, channel, messagingVersion));
+        session.attachInbound(channel);
+        session.attachOutbound(channel);
+
+        logger.debug("Creating {}", channel.description());

Review Comment:
   As we know the difference, might be kind to log `Creating control {}`



##########
src/java/org/apache/cassandra/streaming/StreamingChannel.java:
##########
@@ -49,6 +49,14 @@ public static void unsafeSet(StreamingChannel.Factory factory)
         }
 
         StreamingChannel create(InetSocketAddress to, int messagingVersion, Kind kind) throws IOException;
+
+        default StreamingChannel create(InetSocketAddress to,
+                                        InetSocketAddress preferred,
+                                        int messagingVersion,
+                                        StreamingChannel.Kind kind) throws IOException
+        {
+            throw new UnsupportedOperationException();

Review Comment:
   Why provide the default interface method? Are there cases where you wouldn't implement this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org