You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by "pkolaczk (via GitHub)" <gi...@apache.org> on 2023/06/15 14:42:34 UTC

[GitHub] [cassandra] pkolaczk opened a new pull request, #2420: Stream all components registered by an sstable

pkolaczk opened a new pull request, #2420:
URL: https://github.com/apache/cassandra/pull/2420

   Custom indexes may register custom components.
   We need to include them in streaming as well.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#issuecomment-1600581931

   I addressed all review comments, except the part about introducing the SAI type for SAI components, because IMHO that needs a wider discussion. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1235723689


##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    // zero copy streaming sends all components, so the events will include non-Data files as well
+    private static final int NUM_COMPONENTS = 17;
+
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        test(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException
+    {
+        test(false);
+    }
+
+    private void test(boolean zeroCopyStreaming) throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> c.with(Feature.values())
+                                                             .set("stream_entire_sstables", zeroCopyStreaming).set("streaming_slow_events_log_timeout", "0s"))
+                                           .start()))
+        {
+            // streaming sends events every 65k, so need to make sure that the files are larger than this to hit
+            // all cases of the vtable
+            cluster.schemaChange(withKeyspace(
+                "CREATE TABLE %s.test (pk int PRIMARY KEY, v text, b blob) WITH compression = { 'enabled' : false };"
+            ));
+            cluster.schemaChange(withKeyspace(
+                "CREATE CUSTOM INDEX ON %s.test(v) USING 'StorageAttachedIndex';"

Review Comment:
   Once numerics support merges, I guess we'll want to parameterize this test further to use those. The number of expected index components might change as well. CC @mike-tr-adamson 
   
   I'm not sure if that means we should merge this first and adjust inside a rebased CASSANDRA-18067 or the other way around.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1242510685


##########
src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java:
##########
@@ -79,10 +84,20 @@ public enum IndexComponent
      */
     GROUP_COMPLETION_MARKER("GroupComplete");
 
+    public final Component.Type type;
     public final String name;
 
     IndexComponent(String name)
     {
         this.name = name;
+        this.type = componentType(name);
+    }
+
+    private static Component.Type componentType(String name)
+    {
+        return Component.Type.create(SAI_DESCRIPTOR + SAI_SEPARATOR + name.toUpperCase(),

Review Comment:
   nit: `toUpperCase()` here seems unnecessary, as it will give us weird things like "GROUPCOMPLETE". Camel is ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1242508724


##########
src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java:
##########
@@ -79,10 +84,20 @@ public enum IndexComponent
      */
     GROUP_COMPLETION_MARKER("GroupComplete");
 
+    public final Component.Type type;
     public final String name;
 
     IndexComponent(String name)
     {
         this.name = name;
+        this.type = componentType(name);
+    }
+
+    private static Component.Type componentType(String name)
+    {
+        return Component.Type.create(SAI_DESCRIPTOR + SAI_SEPARATOR + name.toUpperCase(),
+                                     SAI_DESCRIPTOR + ".*." + name + ".db",

Review Comment:
   Looking at `Version.defaultFileNameFormat()`, I think you'd want something like:
   
   `SAI_DESCRIPTOR + "\\+.*\\+" + name + ".db"`
   
   `".*."` doesn't need the second period?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243310517


##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -156,23 +168,23 @@ public static class Types
         {
             // the base data for an sstable: the remaining components can be regenerated
             // based on the data component
-            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", null);
+            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", true, null);
             // file to hold information about uncompressed data length, chunk offsets etc.
-            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", null);
+            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", true, null);
             // statistical metadata about the content of the sstable
-            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", null);
+            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", true, null);
             // serialized bloom filter for the row keys in the sstable
-            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", null);
+            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", true, null);
             // holds CRC32 checksum of the data file
-            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", null);
+            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", true, null);
             // holds the CRC32 for chunks in an uncompressed file.
-            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", null);
+            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", true, null);
             // table of contents, stores the list of all components for the sstable
-            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", null);
+            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", false, null);
             // built-in secondary index (may exist multiple per sstable)
-            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", null);
+            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", true, null);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243357990


##########
src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java:
##########
@@ -79,10 +84,20 @@ public enum IndexComponent
      */
     GROUP_COMPLETION_MARKER("GroupComplete");
 
+    public final Component.Type type;
     public final String name;
 
     IndexComponent(String name)
     {
         this.name = name;
+        this.type = componentType(name);
+    }
+
+    private static Component.Type componentType(String name)
+    {
+        return Component.Type.create(SAI_DESCRIPTOR + SAI_SEPARATOR + name.toUpperCase(),

Review Comment:
   Indeed. Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1235790135


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -61,12 +58,8 @@ public SSTableZeroCopyWriter(Builder<?, ?> builder,
         lifecycleNewTracker.trackNew(this);
         this.componentWriters = new HashMap<>();
 
-        if (!descriptor.getFormat().streamingComponents().containsAll(components))
-            throw new AssertionError(format("Unsupported streaming component detected %s",
-                                            Sets.difference(ImmutableSet.copyOf(components), descriptor.getFormat().streamingComponents())));
-

Review Comment:
   @pkolaczk @mike-tr-adamson I'm not sure we want to just remove this validation, but checking SAI components raises some other questions. The simplest way around this for now might be...
   
   ```
   Set<Component> unsupported = components.stream()
                                          .filter(c -> !descriptor.getFormat().streamingComponents().contains(c))
                                          .filter(c-> c.type != SSTableFormat.Components.Types.CUSTOM)
                                          .collect(Collectors.toSet());
   
   if (!unsupported.isEmpty())
       throw new AssertionError(format("Unsupported streaming component detected %s", unsupported));
   ```
   
   However, it might also be worth looking at why we use `CUSTOM` for SAI components in the first place. Is now the time to simply create a new first-class type in `Types`?
   
   ```
   public static final Component.Type SAI = Component.Type.create("SAI", "SAI\+.*.db", null);
   ```
   
   (The regex could be stricter, but details...)
   
   If we do this, we the `CUSTOM` in the check above becomes `SAI`, we replace `CUSTOM` w/ `SAI` in a few places in the SAI code, like `Version`, and things should just work, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1236617154


##########
src/java/org/apache/cassandra/db/streaming/ComponentManifest.java:
##########
@@ -52,13 +53,17 @@ public ComponentManifest(Map<Component, Long> components)
     }
 
     @VisibleForTesting
-    public static ComponentManifest create(Descriptor descriptor)
+    public static ComponentManifest create(SSTable sstable)
     {
-        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(descriptor.getFormat().streamingComponents().size());
+        Set<Component> streamingComponents = sstable.getComponents();
+        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(streamingComponents.size());
 
-        for (Component component : descriptor.getFormat().streamingComponents())
+        for (Component component : streamingComponents)
         {
-            File file = descriptor.fileFor(component);
+            if (component == SSTableFormat.Components.TOC)

Review Comment:
   First, I like the idea of having `getStreamingComponents` method on the sstable. This looks like a better place to tell which components are eligible for streaming. 
   
   However, even with such method, we still have a "responsibility leak" from SAI here. 
   
   ```
   c.type == Components.Types.SAI
   ```
   
   Why should SSTable code know anything about SAI? Why should it assume all SAI components should be streamed?  IMHO this is something that should be decided by the SAI code (or whoever owns the component), not by sstable code.
   
   SAI already controls the type of the component, so I guess the ideal-world solution would be to have this information associated directly with the type so instead of special-casing SAI vs core components, we could write the condition like this:
   
   ```
    return components.stream()
                        .filter(c -> c.type.shouldBeStreamed)
                        .collect(Collectors.toSet());
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1236667069


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -61,12 +58,8 @@ public SSTableZeroCopyWriter(Builder<?, ?> builder,
         lifecycleNewTracker.trackNew(this);
         this.componentWriters = new HashMap<>();
 
-        if (!descriptor.getFormat().streamingComponents().containsAll(components))
-            throw new AssertionError(format("Unsupported streaming component detected %s",
-                                            Sets.difference(ImmutableSet.copyOf(components), descriptor.getFormat().streamingComponents())));
-

Review Comment:
   As for this:
   ```
   public static final Component.Type SAI = Component.Type.create("SAI", "SAI\\+.*.db", null);
   ```
   
   This wouldn't be very consistent with what we use component types for now in the core - because now each core sstable component  has its own type. So why should we throw all different SAI component types into a single type? 
   
   I think we need to take another look at how custom components are registered and simply allow modules to provide their own types. So SAI component types should be defined in SAI. We could also allow the component type to decide whether it should be streamed or not.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1235719269


##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    // zero copy streaming sends all components, so the events will include non-Data files as well
+    private static final int NUM_COMPONENTS = 17;

Review Comment:
   nit: We can probably make this a little more self-documenting if we sum up `V1OnDiskFormat.PER_SSTABLE_COMPONENTS`, `V1OnDiskFormat.LITERAL_COMPONENTS`, and the expected core SSTable components (perhaps from `SSTableFormat#streamingComponents()` sans compression info, which I guess we don't have here?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1240035239


##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -59,7 +60,18 @@ public interface SSTableFormat<R extends SSTableReader, W extends SSTableWriter>
      */
     Set<Component> allComponents();
 
-    Set<Component> streamingComponents();
+    /**
+     * Returns the components that should be streamed to other nodes on repair / rebuild.
+     * This includes only the core SSTable components produced by this format.
+     * Custom components registered by e.g. secondary indexes are not included.
+     * Use {@link SSTableReader#getStreamingComponents()} for the list of all components including the custom ones.
+     */
+    default Set<Component> streamingComponents()

Review Comment:
   Once all `Component` instances have their "streamability" embedded, we don't need this method at all, right? i.e. `SSTableZeroCopyWriter` will no longer need special casing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1242493940


##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    private static final int NUM_COMPONENTS;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        NUM_COMPONENTS = sstableStreamingComponentsCount()
+                         + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size()
+                         + V1OnDiskFormat.LITERAL_COMPONENTS.size();
+    }
+
+    private static int sstableStreamingComponentsCount() {

Review Comment:
   nit: newline for `{`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1235790135


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -61,12 +58,8 @@ public SSTableZeroCopyWriter(Builder<?, ?> builder,
         lifecycleNewTracker.trackNew(this);
         this.componentWriters = new HashMap<>();
 
-        if (!descriptor.getFormat().streamingComponents().containsAll(components))
-            throw new AssertionError(format("Unsupported streaming component detected %s",
-                                            Sets.difference(ImmutableSet.copyOf(components), descriptor.getFormat().streamingComponents())));
-

Review Comment:
   @pkolaczk @mike-tr-adamson I'm not sure we want to just remove this validation, but checking SAI components raises some other questions. The simplest way around this for now might be...
   
   ```
   Set<Component> unsupported = components.stream()
                                          .filter(c -> !descriptor.getFormat().streamingComponents().contains(c))
                                          .filter(c-> c.type != SSTableFormat.Components.Types.CUSTOM)
                                          .collect(Collectors.toSet());
   
   if (!unsupported.isEmpty())
       throw new AssertionError(format("Unsupported streaming component detected %s", unsupported));
   ```
   
   However, it might also be worth looking at why we use `CUSTOM` for SAI components in the first place. Is now the time to simply create a new first-class type in `Types`?
   
   ```
   public static final Component.Type SAI = Component.Type.create("SAI", "SAI\\+.*.db", null);
   ```
   
   (The regex could be stricter, but details...)
   
   If we do this, we the `CUSTOM` in the check above becomes `SAI`, we replace `CUSTOM` w/ `SAI` in a few places in the SAI code, like `Version`, and things should just work, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1235792275


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -61,12 +58,8 @@ public SSTableZeroCopyWriter(Builder<?, ?> builder,
         lifecycleNewTracker.trackNew(this);
         this.componentWriters = new HashMap<>();
 
-        if (!descriptor.getFormat().streamingComponents().containsAll(components))
-            throw new AssertionError(format("Unsupported streaming component detected %s",
-                                            Sets.difference(ImmutableSet.copyOf(components), descriptor.getFormat().streamingComponents())));
-

Review Comment:
   btw, I just did exactly this, and it works fine...at least `IndexStreamingTest` has no issues



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1236584707


##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    // zero copy streaming sends all components, so the events will include non-Data files as well
+    private static final int NUM_COMPONENTS = 17;

Review Comment:
   Good idea. Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1237331633


##########
src/java/org/apache/cassandra/db/streaming/ComponentManifest.java:
##########
@@ -52,13 +53,17 @@ public ComponentManifest(Map<Component, Long> components)
     }
 
     @VisibleForTesting
-    public static ComponentManifest create(Descriptor descriptor)
+    public static ComponentManifest create(SSTable sstable)
     {
-        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(descriptor.getFormat().streamingComponents().size());
+        Set<Component> streamingComponents = sstable.getComponents();
+        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(streamingComponents.size());
 
-        for (Component component : descriptor.getFormat().streamingComponents())
+        for (Component component : streamingComponents)
         {
-            File file = descriptor.fileFor(component);
+            if (component == SSTableFormat.Components.TOC)

Review Comment:
   Yeah, at the end of the day, some things are just not part of the SSTable format, so if/when that is the case, having the component decide whether it's "streamable" doesn't seem like a bad idea. (Or we could do that for all components.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243570674


##########
src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java:
##########
@@ -79,10 +84,20 @@ public enum IndexComponent
      */
     GROUP_COMPLETION_MARKER("GroupComplete");
 
+    public final Component.Type type;
     public final String name;
 
     IndexComponent(String name)
     {
         this.name = name;
+        this.type = componentType(name);
+    }
+
+    private static Component.Type componentType(String name)
+    {
+        return Component.Type.create(SAI_DESCRIPTOR + SAI_SEPARATOR + name.toUpperCase(),
+                                     SAI_DESCRIPTOR + ".*." + name + ".db",

Review Comment:
   Fixed, and also added pattern quoting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on pull request #2420: CASSANDRA-18345 Stream all components registered by an SSTable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#issuecomment-1611057306

   CI run: https://app.circleci.com/pipelines/github/pkolaczk/cassandra/16/workflows/41cc8da8-cf2b-4985-be0c-cf5ffd9598d8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1235814413


##########
src/java/org/apache/cassandra/db/streaming/ComponentManifest.java:
##########
@@ -52,13 +53,17 @@ public ComponentManifest(Map<Component, Long> components)
     }
 
     @VisibleForTesting
-    public static ComponentManifest create(Descriptor descriptor)
+    public static ComponentManifest create(SSTable sstable)
     {
-        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(descriptor.getFormat().streamingComponents().size());
+        Set<Component> streamingComponents = sstable.getComponents();
+        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(streamingComponents.size());
 
-        for (Component component : descriptor.getFormat().streamingComponents())
+        for (Component component : streamingComponents)
         {
-            File file = descriptor.fileFor(component);
+            if (component == SSTableFormat.Components.TOC)

Review Comment:
   We could also hide this in `SSTable` with a `getStreamingComponents()` and leave this logic in the manifest almost unchanged? It already makes a copy of `components`, and this isn't really a hot path anyway, so filtering there w/ something like...
   
   ```
   public Set<Component> getStreamingComponents()
   {
       return components.stream()
                        .filter(c -> descriptor.getFormat().streamingComponents().contains(c) || c.type == Components.Types.SAI)
                        .collect(Collectors.toSet());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] adelapena commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "adelapena (via GitHub)" <gi...@apache.org>.
adelapena commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1242034910


##########
src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java:
##########
@@ -82,9 +82,9 @@ public static class Components extends SSTableFormat.Components
         public static class Types extends SSTableFormat.Components.Types
         {
             // index of the row keys with pointers to their positions in the data file
-            public static final Component.Type PRIMARY_INDEX = Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", BigFormat.class);
+            public static final Component.Type PRIMARY_INDEX = Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", true, BigFormat.class);

Review Comment:
   We'll need to update [the doc on `SSTable_API.md`](https://github.com/apache/cassandra/blob/cep-7-sai/src/java/org/apache/cassandra/io/sstable/SSTable_API.md#components) to include the new `streamable` parameter.



##########
src/java/org/apache/cassandra/io/sstable/Component.java:
##########
@@ -60,31 +61,34 @@ public final static class Type
         /**
          * Creates a new non-singleton type and registers it a global type registry - see {@link #registerType(Type)}.
          *
-         * @param name        type name, must be unique for this and all parent formats
-         * @param repr        the regular expression to be used to recognize a name represents this type
-         * @param formatClass format class for which this type is defined for
+         * @param name         type name, must be unique for this and all parent formats
+         * @param repr         the regular expression to be used to recognize a name represents this type
+         * @param isStreamable whether components of this type should be streamed to other nodes
+         * @param formatClass  format class for which this type is defined for
          */
-        public static Type create(String name, String repr, Class<? extends SSTableFormat<?, ?>> formatClass)
+        public static Type create(String name, String repr, boolean isStreamable, Class<? extends SSTableFormat<?, ?>> formatClass)
         {
-            return new Type(name, repr, false, formatClass);
+            return new Type(name, repr, false, isStreamable, formatClass);
         }
 
         /**
          * Creates a new singleton type and registers it in a global type registry - see {@link #registerType(Type)}.
          *
-         * @param name        type name, must be unique for this and all parent formats
-         * @param repr        the regular expression to be used to recognize a name represents this type
-         * @param formatClass format class for which this type is defined for
+         * @param name         type name, must be unique for this and all parent formats
+         * @param repr         the regular expression to be used to recognize a name represents this type
+         * @param isStreamable whether components of this type should be streamed to other nodes
+         * @param formatClass  format class for which this type is defined for
          */
-        public static Type createSingleton(String name, String repr, Class<? extends SSTableFormat<?, ?>> formatClass)
+        public static Type createSingleton(String name, String repr, boolean isStreamable, Class<? extends SSTableFormat<?, ?>> formatClass)

Review Comment:
   Nit: the parameter could be named `streamable` for consistency with the attribute and constructor names.



##########
src/java/org/apache/cassandra/io/sstable/Component.java:
##########
@@ -60,31 +61,34 @@ public final static class Type
         /**
          * Creates a new non-singleton type and registers it a global type registry - see {@link #registerType(Type)}.
          *
-         * @param name        type name, must be unique for this and all parent formats
-         * @param repr        the regular expression to be used to recognize a name represents this type
-         * @param formatClass format class for which this type is defined for
+         * @param name         type name, must be unique for this and all parent formats
+         * @param repr         the regular expression to be used to recognize a name represents this type
+         * @param isStreamable whether components of this type should be streamed to other nodes
+         * @param formatClass  format class for which this type is defined for
          */
-        public static Type create(String name, String repr, Class<? extends SSTableFormat<?, ?>> formatClass)
+        public static Type create(String name, String repr, boolean isStreamable, Class<? extends SSTableFormat<?, ?>> formatClass)
         {
-            return new Type(name, repr, false, formatClass);
+            return new Type(name, repr, false, isStreamable, formatClass);

Review Comment:
   Nit: the parameter could be named `streamable` for consistency with the attribute and constructor names.



##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    private static final int NUM_COMPONENTS;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        NUM_COMPONENTS = sstableStreamingComponentsCount()
+                         + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size()
+                         + V1OnDiskFormat.LITERAL_COMPONENTS.size();
+    }
+
+    private static int sstableStreamingComponentsCount() {
+        return (int) DatabaseDescriptor.getSelectedSSTableFormat()
+                                       .allComponents()
+                                       .stream()
+                                       .filter(c -> c.type.streamable)
+                                       .count() - 1;  // -1 because we don't include the compression component
+    }
+
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        test(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException
+    {
+        test(false);
+    }
+
+    private void test(boolean zeroCopyStreaming) throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> c.with(Feature.values())
+                                                             .set("stream_entire_sstables", zeroCopyStreaming).set("streaming_slow_events_log_timeout", "0s"))
+                                           .start()))
+        {
+            // streaming sends events every 65k, so need to make sure that the files are larger than this to hit
+            // all cases of the vtable
+            cluster.schemaChange(withKeyspace(
+                "CREATE TABLE %s.test (pk int PRIMARY KEY, v text, b blob) WITH compression = { 'enabled' : false };"
+            ));
+            cluster.schemaChange(withKeyspace(
+                "CREATE CUSTOM INDEX ON %s.test(v) USING 'StorageAttachedIndex';"
+            ));
+            cluster.stream().forEach(i ->
+                i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success()
+            );
+            IInvokableInstance first = cluster.get(1);
+            IInvokableInstance second = cluster.get(2);
+            long sstableCount = 10;
+            long expectedFiles = zeroCopyStreaming ? sstableCount * NUM_COMPONENTS : sstableCount;
+            for (int i = 0; i < sstableCount; i++)
+            {
+                first.executeInternal(withKeyspace("insert into %s.test(pk, v, b) values (?, ?, ?)"), i, "v" + i, BLOB);
+                first.flush(KEYSPACE);
+            }
+
+            second.nodetoolResult("rebuild", "--keyspace", KEYSPACE).asserts().success();
+
+            SimpleQueryResult qr = first.executeInternalWithResult("SELECT * FROM system_views.streaming");
+            String txt = QueryResultUtil.expand(qr);
+            qr.reset();
+            assertThat(qr.toObjectArrays().length).describedAs("Found rows\n%s", txt).isEqualTo(1);
+            assertThat(qr.hasNext()).isTrue();
+            Row row = qr.next();
+            QueryResultUtil.assertThat(row)
+                           .isEqualTo("peers", Collections.singletonList("/127.0.0.2:7012"))

Review Comment:
   ```suggestion
                              .isEqualTo("peers", Collections.singletonList(second.broadcastAddress().toString()))
   ```



##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -195,14 +197,15 @@ public void close()
             writer.close();
     }
 
-    public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
+    public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException
     {
-        logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+        SequentialWriter writer = componentWriters.get(component.name);

Review Comment:
   Build fails here with `eclipse-warnings` saying `Potential resource leak: 'writer' may not be closed`



##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    private static final int NUM_COMPONENTS;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        NUM_COMPONENTS = sstableStreamingComponentsCount()
+                         + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size()
+                         + V1OnDiskFormat.LITERAL_COMPONENTS.size();
+    }
+
+    private static int sstableStreamingComponentsCount() {
+        return (int) DatabaseDescriptor.getSelectedSSTableFormat()
+                                       .allComponents()
+                                       .stream()
+                                       .filter(c -> c.type.streamable)
+                                       .count() - 1;  // -1 because we don't include the compression component
+    }
+
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        test(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException
+    {
+        test(false);
+    }
+
+    private void test(boolean zeroCopyStreaming) throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> c.with(Feature.values())
+                                                             .set("stream_entire_sstables", zeroCopyStreaming).set("streaming_slow_events_log_timeout", "0s"))
+                                           .start()))
+        {
+            // streaming sends events every 65k, so need to make sure that the files are larger than this to hit
+            // all cases of the vtable
+            cluster.schemaChange(withKeyspace(
+                "CREATE TABLE %s.test (pk int PRIMARY KEY, v text, b blob) WITH compression = { 'enabled' : false };"
+            ));
+            cluster.schemaChange(withKeyspace(
+                "CREATE CUSTOM INDEX ON %s.test(v) USING 'StorageAttachedIndex';"
+            ));
+            cluster.stream().forEach(i ->
+                i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success()
+            );
+            IInvokableInstance first = cluster.get(1);
+            IInvokableInstance second = cluster.get(2);
+            long sstableCount = 10;
+            long expectedFiles = zeroCopyStreaming ? sstableCount * NUM_COMPONENTS : sstableCount;
+            for (int i = 0; i < sstableCount; i++)
+            {
+                first.executeInternal(withKeyspace("insert into %s.test(pk, v, b) values (?, ?, ?)"), i, "v" + i, BLOB);
+                first.flush(KEYSPACE);
+            }
+
+            second.nodetoolResult("rebuild", "--keyspace", KEYSPACE).asserts().success();
+
+            SimpleQueryResult qr = first.executeInternalWithResult("SELECT * FROM system_views.streaming");
+            String txt = QueryResultUtil.expand(qr);
+            qr.reset();
+            assertThat(qr.toObjectArrays().length).describedAs("Found rows\n%s", txt).isEqualTo(1);
+            assertThat(qr.hasNext()).isTrue();
+            Row row = qr.next();
+            QueryResultUtil.assertThat(row)
+                           .isEqualTo("peers", Collections.singletonList("/127.0.0.2:7012"))
+                           .isEqualTo("follower", true)
+                           .isEqualTo("operation", "Rebuild")
+                           .isEqualTo("status", "success")
+                           .isEqualTo("progress_percentage", 100.0F)
+                           .isEqualTo("success_message", null).isEqualTo("failure_cause", null)
+                           .isEqualTo("files_sent", expectedFiles)
+                           .columnsEqualTo("files_sent", "files_to_send")
+                           .columnsEqualTo("bytes_sent", "bytes_to_send")
+                           .isEqualTo("files_received", 0L)
+                           .columnsEqualTo("files_received", "files_to_receive", "bytes_received", "bytes_to_receive");
+            long totalBytes = row.getLong("bytes_sent");
+            assertThat(totalBytes).isGreaterThan(0);
+
+            qr = second.executeInternalWithResult("SELECT * FROM system_views.streaming");
+            txt = QueryResultUtil.expand(qr);
+            qr.reset();
+            assertThat(qr.toObjectArrays().length).describedAs("Found rows\n%s", txt).isEqualTo(1);
+            assertThat(qr.hasNext()).isTrue();
+
+            QueryResultUtil.assertThat(qr.next())
+                           .isEqualTo("peers", Collections.singletonList("/127.0.0.1:7012"))
+                           .isEqualTo("follower", false)
+                           .isEqualTo("operation", "Rebuild")
+                           .isEqualTo("status", "success")
+                           .isEqualTo("progress_percentage", 100.0F)
+                           .isEqualTo("success_message", null).isEqualTo("failure_cause", null)
+                           .columnsEqualTo("files_to_receive", "files_received").isEqualTo("files_received", expectedFiles)
+                           .columnsEqualTo("bytes_to_receive", "bytes_received").isEqualTo("bytes_received", totalBytes)
+                           .columnsEqualTo("files_sent", "files_to_send", "bytes_sent", "bytes_to_send").isEqualTo("files_sent", 0L);
+
+            // did we trigger slow event log?
+            cluster.forEach(i -> Assertions.assertThat(i.logs().grep("Handling streaming events took longer than").getResult()).describedAs("Unable to find slow log for node%d", i.config().num()).isNotEmpty());

Review Comment:
   Nit: break the long line
   ```suggestion
               cluster.forEach(i -> Assertions.assertThat(i.logs().grep("Handling streaming events took longer than").getResult())
                                              .describedAs("Unable to find slow log for node%d", i.config().num())
                                              .isNotEmpty());
   ```



##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    private static final int NUM_COMPONENTS;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        NUM_COMPONENTS = sstableStreamingComponentsCount()
+                         + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size()
+                         + V1OnDiskFormat.LITERAL_COMPONENTS.size();
+    }
+
+    private static int sstableStreamingComponentsCount() {
+        return (int) DatabaseDescriptor.getSelectedSSTableFormat()
+                                       .allComponents()
+                                       .stream()
+                                       .filter(c -> c.type.streamable)
+                                       .count() - 1;  // -1 because we don't include the compression component
+    }
+
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        test(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException
+    {
+        test(false);
+    }
+
+    private void test(boolean zeroCopyStreaming) throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> c.with(Feature.values())
+                                                             .set("stream_entire_sstables", zeroCopyStreaming).set("streaming_slow_events_log_timeout", "0s"))
+                                           .start()))
+        {
+            // streaming sends events every 65k, so need to make sure that the files are larger than this to hit
+            // all cases of the vtable

Review Comment:
   Maybe this comment would be better placed either right before the `INSERT` or when declaring the `BLOB` size.



##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    private static final int NUM_COMPONENTS;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        NUM_COMPONENTS = sstableStreamingComponentsCount()
+                         + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size()
+                         + V1OnDiskFormat.LITERAL_COMPONENTS.size();
+    }
+
+    private static int sstableStreamingComponentsCount() {
+        return (int) DatabaseDescriptor.getSelectedSSTableFormat()
+                                       .allComponents()
+                                       .stream()
+                                       .filter(c -> c.type.streamable)
+                                       .count() - 1;  // -1 because we don't include the compression component
+    }
+
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        test(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException
+    {
+        test(false);
+    }
+
+    private void test(boolean zeroCopyStreaming) throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> c.with(Feature.values())
+                                                             .set("stream_entire_sstables", zeroCopyStreaming).set("streaming_slow_events_log_timeout", "0s"))

Review Comment:
   Nit: break the long line
   ```suggestion
                                                                .set("stream_entire_sstables", zeroCopyStreaming)
                                                                .set("streaming_slow_events_log_timeout", "0s"))
   ```



##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    private static final int NUM_COMPONENTS;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        NUM_COMPONENTS = sstableStreamingComponentsCount()
+                         + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size()
+                         + V1OnDiskFormat.LITERAL_COMPONENTS.size();
+    }
+
+    private static int sstableStreamingComponentsCount() {
+        return (int) DatabaseDescriptor.getSelectedSSTableFormat()
+                                       .allComponents()
+                                       .stream()
+                                       .filter(c -> c.type.streamable)
+                                       .count() - 1;  // -1 because we don't include the compression component
+    }
+
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        test(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException
+    {
+        test(false);
+    }
+
+    private void test(boolean zeroCopyStreaming) throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> c.with(Feature.values())
+                                                             .set("stream_entire_sstables", zeroCopyStreaming).set("streaming_slow_events_log_timeout", "0s"))
+                                           .start()))
+        {
+            // streaming sends events every 65k, so need to make sure that the files are larger than this to hit
+            // all cases of the vtable
+            cluster.schemaChange(withKeyspace(
+                "CREATE TABLE %s.test (pk int PRIMARY KEY, v text, b blob) WITH compression = { 'enabled' : false };"
+            ));
+            cluster.schemaChange(withKeyspace(
+                "CREATE CUSTOM INDEX ON %s.test(v) USING 'StorageAttachedIndex';"
+            ));
+            cluster.stream().forEach(i ->
+                i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success()
+            );
+            IInvokableInstance first = cluster.get(1);
+            IInvokableInstance second = cluster.get(2);
+            long sstableCount = 10;
+            long expectedFiles = zeroCopyStreaming ? sstableCount * NUM_COMPONENTS : sstableCount;
+            for (int i = 0; i < sstableCount; i++)
+            {
+                first.executeInternal(withKeyspace("insert into %s.test(pk, v, b) values (?, ?, ?)"), i, "v" + i, BLOB);
+                first.flush(KEYSPACE);
+            }
+
+            second.nodetoolResult("rebuild", "--keyspace", KEYSPACE).asserts().success();
+
+            SimpleQueryResult qr = first.executeInternalWithResult("SELECT * FROM system_views.streaming");
+            String txt = QueryResultUtil.expand(qr);
+            qr.reset();
+            assertThat(qr.toObjectArrays().length).describedAs("Found rows\n%s", txt).isEqualTo(1);
+            assertThat(qr.hasNext()).isTrue();
+            Row row = qr.next();
+            QueryResultUtil.assertThat(row)
+                           .isEqualTo("peers", Collections.singletonList("/127.0.0.2:7012"))
+                           .isEqualTo("follower", true)
+                           .isEqualTo("operation", "Rebuild")
+                           .isEqualTo("status", "success")
+                           .isEqualTo("progress_percentage", 100.0F)
+                           .isEqualTo("success_message", null).isEqualTo("failure_cause", null)
+                           .isEqualTo("files_sent", expectedFiles)
+                           .columnsEqualTo("files_sent", "files_to_send")
+                           .columnsEqualTo("bytes_sent", "bytes_to_send")
+                           .isEqualTo("files_received", 0L)
+                           .columnsEqualTo("files_received", "files_to_receive", "bytes_received", "bytes_to_receive");
+            long totalBytes = row.getLong("bytes_sent");
+            assertThat(totalBytes).isGreaterThan(0);
+
+            qr = second.executeInternalWithResult("SELECT * FROM system_views.streaming");
+            txt = QueryResultUtil.expand(qr);
+            qr.reset();
+            assertThat(qr.toObjectArrays().length).describedAs("Found rows\n%s", txt).isEqualTo(1);
+            assertThat(qr.hasNext()).isTrue();
+
+            QueryResultUtil.assertThat(qr.next())
+                           .isEqualTo("peers", Collections.singletonList("/127.0.0.1:7012"))

Review Comment:
   ```suggestion
                              .isEqualTo("peers", Collections.singletonList(first.broadcastAddress().toString()))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243310283


##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -59,7 +60,18 @@ public interface SSTableFormat<R extends SSTableReader, W extends SSTableWriter>
      */
     Set<Component> allComponents();
 
-    Set<Component> streamingComponents();
+    /**
+     * Returns the components that should be streamed to other nodes on repair / rebuild.
+     * This includes only the core SSTable components produced by this format.
+     * Custom components registered by e.g. secondary indexes are not included.
+     * Use {@link SSTableReader#getStreamingComponents()} for the list of all components including the custom ones.
+     */
+    default Set<Component> streamingComponents()

Review Comment:
   I simply moved to testing code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: CASSANDRA-18345 Stream all components registered by an SSTable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1244925506


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -195,14 +197,15 @@ public void close()
             writer.close();
     }
 
-    public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
+    public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException
     {
-        logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+        SequentialWriter writer = componentWriters.get(component.name);

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] adelapena commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "adelapena (via GitHub)" <gi...@apache.org>.
adelapena commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243581101


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -195,14 +197,15 @@ public void close()
             writer.close();
     }
 
-    public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
+    public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException
     {
-        logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+        SequentialWriter writer = componentWriters.get(component.name);

Review Comment:
   I get this error locally when running `eclipse-warnings` with JDK 8:
   ```
   eclipse-warnings:
       [mkdir] Created dir: /Users/adelapena/src/cassandra/trunk/build/ecj
        [echo] Running Eclipse Code Analysis.  Output logged to /Users/adelapena/src/cassandra/trunk/build/ecj/eclipse_compiler_checks.txt
        [java] ----------
        [java] 1. ERROR in /Users/adelapena/src/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java (at line 202)
        [java] 	SequentialWriter writer = componentWriters.get(component.name);
        [java] 	                 ^^^^^^
        [java] Potential resource leak: 'writer' may not be closed
        [java] ----------
        [java] 1 problem (1 error)
   
   BUILD FAILED
   ```
   CircleCI also hits it for j8: https://app.circleci.com/pipelines/github/adelapena/cassandra/2986/workflows/82e3ccc4-83ce-49c8-ab0d-036d100cae8e/jobs/54244
   
   However, it doesn't fail for j11 builds: https://app.circleci.com/pipelines/github/adelapena/cassandra/2986/workflows/554d79db-c157-4e53-944a-ced1dffc242f
   
   Do you have a link to the CircleCI run? Maybe it was a j11-only run? j8 support will we dropped very soon ([CASSANDRA-18255](https://issues.apache.org/jira/browse/CASSANDRA-18255)), but we still need it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1241798984


##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -156,23 +168,23 @@ public static class Types
         {
             // the base data for an sstable: the remaining components can be regenerated
             // based on the data component
-            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", null);
+            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", true, null);
             // file to hold information about uncompressed data length, chunk offsets etc.
-            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", null);
+            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", true, null);
             // statistical metadata about the content of the sstable
-            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", null);
+            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", true, null);
             // serialized bloom filter for the row keys in the sstable
-            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", null);
+            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", true, null);
             // holds CRC32 checksum of the data file
-            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", null);
+            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", true, null);
             // holds the CRC32 for chunks in an uncompressed file.
-            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", null);
+            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", true, null);
             // table of contents, stores the list of all components for the sstable
-            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", null);
+            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", false, null);
             // built-in secondary index (may exist multiple per sstable)
-            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", null);
+            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", true, null);
             // custom component, used by e.g. custom compaction strategy
-            public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, null);
+            public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, true, null);

Review Comment:
   Yes, that is the plan. I haven't finished that yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1235719269


##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    // zero copy streaming sends all components, so the events will include non-Data files as well
+    private static final int NUM_COMPONENTS = 17;

Review Comment:
   nit: We can probably make this a little more self-documenting if we sum up `V1OnDiskFormat.PER_SSTABLE_COMPONENTS`, `V1OnDiskFormat.LITERAL_COMPONENTS`, and the expected core SSTable components (perhaps from `SSTableFormat#streamingComponents()`?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1235812969


##########
src/java/org/apache/cassandra/db/streaming/ComponentManifest.java:
##########
@@ -52,13 +53,17 @@ public ComponentManifest(Map<Component, Long> components)
     }
 
     @VisibleForTesting
-    public static ComponentManifest create(Descriptor descriptor)
+    public static ComponentManifest create(SSTable sstable)
     {
-        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(descriptor.getFormat().streamingComponents().size());
+        Set<Component> streamingComponents = sstable.getComponents();
+        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(streamingComponents.size());
 
-        for (Component component : descriptor.getFormat().streamingComponents())
+        for (Component component : streamingComponents)
         {
-            File file = descriptor.fileFor(component);
+            if (component == SSTableFormat.Components.TOC)

Review Comment:
   nit: I guess the spirit of the original code here was to delegate this as much as possible to the SSTable format. What if we tried to preserve that (and coupled it with the `SAI` component I suggested below) and did something like...
   
   ```
   if (!sstable.descriptor.getFormat().streamingComponents().contains(component) && component.type != BigFormat.Components.Types.SAI)
   ```
   I like the idea of using `sstable.getComponents()` to make sure we don't forget anything, but it seems like picking out the `TOC` alone is a little brittle. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1235814413


##########
src/java/org/apache/cassandra/db/streaming/ComponentManifest.java:
##########
@@ -52,13 +53,17 @@ public ComponentManifest(Map<Component, Long> components)
     }
 
     @VisibleForTesting
-    public static ComponentManifest create(Descriptor descriptor)
+    public static ComponentManifest create(SSTable sstable)
     {
-        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(descriptor.getFormat().streamingComponents().size());
+        Set<Component> streamingComponents = sstable.getComponents();
+        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(streamingComponents.size());
 
-        for (Component component : descriptor.getFormat().streamingComponents())
+        for (Component component : streamingComponents)
         {
-            File file = descriptor.fileFor(component);
+            if (component == SSTableFormat.Components.TOC)

Review Comment:
   We could also hide this in `SSTable` with a `getStreamingComponents()` and leave this logic in the manifest almost unchanged?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243330863


##########
src/java/org/apache/cassandra/io/sstable/Component.java:
##########
@@ -60,31 +61,34 @@ public final static class Type
         /**
          * Creates a new non-singleton type and registers it a global type registry - see {@link #registerType(Type)}.
          *
-         * @param name        type name, must be unique for this and all parent formats
-         * @param repr        the regular expression to be used to recognize a name represents this type
-         * @param formatClass format class for which this type is defined for
+         * @param name         type name, must be unique for this and all parent formats
+         * @param repr         the regular expression to be used to recognize a name represents this type
+         * @param isStreamable whether components of this type should be streamed to other nodes
+         * @param formatClass  format class for which this type is defined for
          */
-        public static Type create(String name, String repr, Class<? extends SSTableFormat<?, ?>> formatClass)
+        public static Type create(String name, String repr, boolean isStreamable, Class<? extends SSTableFormat<?, ?>> formatClass)
         {
-            return new Type(name, repr, false, formatClass);
+            return new Type(name, repr, false, isStreamable, formatClass);

Review Comment:
   Fixed



##########
src/java/org/apache/cassandra/io/sstable/Component.java:
##########
@@ -60,31 +61,34 @@ public final static class Type
         /**
          * Creates a new non-singleton type and registers it a global type registry - see {@link #registerType(Type)}.
          *
-         * @param name        type name, must be unique for this and all parent formats
-         * @param repr        the regular expression to be used to recognize a name represents this type
-         * @param formatClass format class for which this type is defined for
+         * @param name         type name, must be unique for this and all parent formats
+         * @param repr         the regular expression to be used to recognize a name represents this type
+         * @param isStreamable whether components of this type should be streamed to other nodes
+         * @param formatClass  format class for which this type is defined for
          */
-        public static Type create(String name, String repr, Class<? extends SSTableFormat<?, ?>> formatClass)
+        public static Type create(String name, String repr, boolean isStreamable, Class<? extends SSTableFormat<?, ?>> formatClass)
         {
-            return new Type(name, repr, false, formatClass);
+            return new Type(name, repr, false, isStreamable, formatClass);
         }
 
         /**
          * Creates a new singleton type and registers it in a global type registry - see {@link #registerType(Type)}.
          *
-         * @param name        type name, must be unique for this and all parent formats
-         * @param repr        the regular expression to be used to recognize a name represents this type
-         * @param formatClass format class for which this type is defined for
+         * @param name         type name, must be unique for this and all parent formats
+         * @param repr         the regular expression to be used to recognize a name represents this type
+         * @param isStreamable whether components of this type should be streamed to other nodes
+         * @param formatClass  format class for which this type is defined for
          */
-        public static Type createSingleton(String name, String repr, Class<? extends SSTableFormat<?, ?>> formatClass)
+        public static Type createSingleton(String name, String repr, boolean isStreamable, Class<? extends SSTableFormat<?, ?>> formatClass)

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1240080694


##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -156,23 +168,23 @@ public static class Types
         {
             // the base data for an sstable: the remaining components can be regenerated
             // based on the data component
-            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", null);
+            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", true, null);
             // file to hold information about uncompressed data length, chunk offsets etc.
-            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", null);
+            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", true, null);
             // statistical metadata about the content of the sstable
-            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", null);
+            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", true, null);
             // serialized bloom filter for the row keys in the sstable
-            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", null);
+            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", true, null);
             // holds CRC32 checksum of the data file
-            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", null);
+            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", true, null);
             // holds the CRC32 for chunks in an uncompressed file.
-            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", null);
+            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", true, null);
             // table of contents, stores the list of all components for the sstable
-            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", null);
+            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", false, null);
             // built-in secondary index (may exist multiple per sstable)
-            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", null);
+            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", true, null);
             // custom component, used by e.g. custom compaction strategy
-            public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, null);
+            public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, true, null);

Review Comment:
   I guess not all CUSTOM components should be streamable?
   
   Is the plan here to just create something like the other `Types` inner classes, but in SAI's `V1OnDiskFormat`? If we did that, we could specify "streamability" there in those types. For example:
   
   `Component.Type TERMS_DATA = Component.Type.create("TERMS_DATA", "SAI\\+.*\\+TermsData.db", true, null);`
   
   This is really where all the stuff in `IndexComponent` probably should have gone in the first place.
   
   CC @mike-tr-adamson 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz closed pull request #2420: CASSANDRA-18345 Stream all components registered by an SSTable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz closed pull request #2420: CASSANDRA-18345 Stream all components registered by an SSTable
URL: https://github.com/apache/cassandra/pull/2420


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#issuecomment-1594960281

   nit: There are some unused import errors on J11 checkstyle


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1240043268


##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -156,23 +168,23 @@ public static class Types
         {
             // the base data for an sstable: the remaining components can be regenerated
             // based on the data component
-            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", null);
+            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", true, null);
             // file to hold information about uncompressed data length, chunk offsets etc.
-            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", null);
+            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", true, null);
             // statistical metadata about the content of the sstable
-            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", null);
+            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", true, null);
             // serialized bloom filter for the row keys in the sstable
-            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", null);
+            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", true, null);
             // holds CRC32 checksum of the data file
-            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", null);
+            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", true, null);
             // holds the CRC32 for chunks in an uncompressed file.
-            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", null);
+            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", true, null);
             // table of contents, stores the list of all components for the sstable
-            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", null);
+            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", false, null);
             // built-in secondary index (may exist multiple per sstable)
-            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", null);
+            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", true, null);

Review Comment:
   nit: Are `SECONDARY_INDEX` components streamable? I thought we always rebuilt them at the receiver...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1235723689


##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    // zero copy streaming sends all components, so the events will include non-Data files as well
+    private static final int NUM_COMPONENTS = 17;
+
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        test(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException
+    {
+        test(false);
+    }
+
+    private void test(boolean zeroCopyStreaming) throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> c.with(Feature.values())
+                                                             .set("stream_entire_sstables", zeroCopyStreaming).set("streaming_slow_events_log_timeout", "0s"))
+                                           .start()))
+        {
+            // streaming sends events every 65k, so need to make sure that the files are larger than this to hit
+            // all cases of the vtable
+            cluster.schemaChange(withKeyspace(
+                "CREATE TABLE %s.test (pk int PRIMARY KEY, v text, b blob) WITH compression = { 'enabled' : false };"
+            ));
+            cluster.schemaChange(withKeyspace(
+                "CREATE CUSTOM INDEX ON %s.test(v) USING 'StorageAttachedIndex';"

Review Comment:
   Once numerics support merges, I guess we'll want to parameterize this test further to use those. The number of expected index components might not actually change though. CC @mike-tr-adamson 
   
   I'm not sure if that means we should merge this first and adjust inside a rebased CASSANDRA-18067 or the other way around.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1241794018


##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -156,23 +168,23 @@ public static class Types
         {
             // the base data for an sstable: the remaining components can be regenerated
             // based on the data component
-            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", null);
+            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", true, null);
             // file to hold information about uncompressed data length, chunk offsets etc.
-            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", null);
+            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", true, null);
             // statistical metadata about the content of the sstable
-            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", null);
+            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", true, null);
             // serialized bloom filter for the row keys in the sstable
-            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", null);
+            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", true, null);
             // holds CRC32 checksum of the data file
-            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", null);
+            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", true, null);
             // holds the CRC32 for chunks in an uncompressed file.
-            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", null);
+            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", true, null);
             // table of contents, stores the list of all components for the sstable
-            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", null);
+            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", false, null);
             // built-in secondary index (may exist multiple per sstable)
-            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", null);
+            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", true, null);

Review Comment:
   Good catch. I forgot to change it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1242495035


##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    private static final int NUM_COMPONENTS;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        NUM_COMPONENTS = sstableStreamingComponentsCount()
+                         + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size()
+                         + V1OnDiskFormat.LITERAL_COMPONENTS.size();

Review Comment:
   Nice. We'll expand on this whenever numerics merge.
   
   CC @mike-tr-adamson 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243358391


##########
src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java:
##########
@@ -79,10 +84,20 @@ public enum IndexComponent
      */
     GROUP_COMPLETION_MARKER("GroupComplete");
 
+    public final Component.Type type;

Review Comment:
   +1 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1242510685


##########
src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java:
##########
@@ -79,10 +84,20 @@ public enum IndexComponent
      */
     GROUP_COMPLETION_MARKER("GroupComplete");
 
+    public final Component.Type type;
     public final String name;
 
     IndexComponent(String name)
     {
         this.name = name;
+        this.type = componentType(name);
+    }
+
+    private static Component.Type componentType(String name)
+    {
+        return Component.Type.create(SAI_DESCRIPTOR + SAI_SEPARATOR + name.toUpperCase(),

Review Comment:
   nit: `toUpperCase()` here seems unnecessary, as it will give us weird things like "GROUPCOMPLETE". Camel is ok?
   
   I'm not even sure we can't just use `name`. The SSTable formats don't namespace, although they don't have any conflicts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1242514042


##########
src/java/org/apache/cassandra/index/sai/disk/format/IndexComponent.java:
##########
@@ -79,10 +84,20 @@ public enum IndexComponent
      */
     GROUP_COMPLETION_MARKER("GroupComplete");
 
+    public final Component.Type type;

Review Comment:
   super OCD nit: maybe `type` could go below `name` so they're in the same order in the ctor :D



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243324417


##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -156,23 +168,23 @@ public static class Types
         {
             // the base data for an sstable: the remaining components can be regenerated
             // based on the data component
-            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", null);
+            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", true, null);
             // file to hold information about uncompressed data length, chunk offsets etc.
-            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", null);
+            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", true, null);
             // statistical metadata about the content of the sstable
-            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", null);
+            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", true, null);
             // serialized bloom filter for the row keys in the sstable
-            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", null);
+            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", true, null);
             // holds CRC32 checksum of the data file
-            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", null);
+            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", true, null);
             // holds the CRC32 for chunks in an uncompressed file.
-            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", null);
+            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", true, null);
             // table of contents, stores the list of all components for the sstable
-            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", null);
+            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", false, null);
             // built-in secondary index (may exist multiple per sstable)
-            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", null);
+            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", true, null);
             // custom component, used by e.g. custom compaction strategy
-            public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, null);
+            public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, true, null);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243332599


##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    private static final int NUM_COMPONENTS;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        NUM_COMPONENTS = sstableStreamingComponentsCount()
+                         + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size()
+                         + V1OnDiskFormat.LITERAL_COMPONENTS.size();
+    }
+
+    private static int sstableStreamingComponentsCount() {
+        return (int) DatabaseDescriptor.getSelectedSSTableFormat()
+                                       .allComponents()
+                                       .stream()
+                                       .filter(c -> c.type.streamable)
+                                       .count() - 1;  // -1 because we don't include the compression component
+    }
+
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        test(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException
+    {
+        test(false);
+    }
+
+    private void test(boolean zeroCopyStreaming) throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> c.with(Feature.values())
+                                                             .set("stream_entire_sstables", zeroCopyStreaming).set("streaming_slow_events_log_timeout", "0s"))

Review Comment:
   Indeed. This test has been mostly copied from RebuildStreamingTest. Those long lines are there as well.
   Do you want me to cleanup long lines in there as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243329696


##########
src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java:
##########
@@ -82,9 +82,9 @@ public static class Components extends SSTableFormat.Components
         public static class Types extends SSTableFormat.Components.Types
         {
             // index of the row keys with pointers to their positions in the data file
-            public static final Component.Type PRIMARY_INDEX = Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", BigFormat.class);
+            public static final Component.Type PRIMARY_INDEX = Component.Type.createSingleton("PRIMARY_INDEX", "Index.db", true, BigFormat.class);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1237338063


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -61,12 +58,8 @@ public SSTableZeroCopyWriter(Builder<?, ?> builder,
         lifecycleNewTracker.trackNew(this);
         this.componentWriters = new HashMap<>();
 
-        if (!descriptor.getFormat().streamingComponents().containsAll(components))
-            throw new AssertionError(format("Unsupported streaming component detected %s",
-                                            Sets.difference(ImmutableSet.copyOf(components), descriptor.getFormat().streamingComponents())));
-

Review Comment:
   > I think we need to take another look at how custom components are registered and simply allow modules to provide their own types. So SAI component types should be defined in SAI. We could also allow the component type to decide whether it should be streamed or not.
   
   I think this would work. +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: CASSANDRA-18345 Stream all components registered by an SSTable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1245689683


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -195,14 +197,16 @@ public void close()
             writer.close();
     }
 
-    public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
+    public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException
     {
-        logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+        @SuppressWarnings({"resource", "RedundantSuppression"})  // all writers are closed in close()

Review Comment:
   Curious...can individual writers fail and cause some writers not to be closed? (above, on 197)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1240092114


##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -156,23 +168,23 @@ public static class Types
         {
             // the base data for an sstable: the remaining components can be regenerated
             // based on the data component
-            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", null);
+            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", true, null);
             // file to hold information about uncompressed data length, chunk offsets etc.
-            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", null);
+            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", true, null);
             // statistical metadata about the content of the sstable
-            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", null);
+            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", true, null);
             // serialized bloom filter for the row keys in the sstable
-            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", null);
+            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", true, null);
             // holds CRC32 checksum of the data file
-            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", null);
+            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", true, null);
             // holds the CRC32 for chunks in an uncompressed file.
-            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", null);
+            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", true, null);
             // table of contents, stores the list of all components for the sstable
-            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", null);
+            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", false, null);
             // built-in secondary index (may exist multiple per sstable)
-            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", null);
+            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", true, null);
             // custom component, used by e.g. custom compaction strategy
-            public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, null);
+            public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, true, null);

Review Comment:
   ...or more completely, something like...
   
   ```
   public static class Components
   {
       public static class Types extends AbstractSSTableFormat.Components.Types
       {
           public static final Component.Type TERMS_DATA = Component.Type.create("TermsData", "SAI\\+.*\\+TermsData.db", true, null);
   ...
       }
   
       public final static Component TERMS_DATA = Types.TERMS_DATA.getSingleton();
   ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1242492118


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -61,12 +61,14 @@ public SSTableZeroCopyWriter(Builder<?, ?> builder,
         lifecycleNewTracker.trackNew(this);
         this.componentWriters = new HashMap<>();
 
-        if (!descriptor.getFormat().streamingComponents().containsAll(components))
-            throw new AssertionError(format("Unsupported streaming component detected %s",
-                                            Sets.difference(ImmutableSet.copyOf(components), descriptor.getFormat().streamingComponents())));
+        Set<Component> unsupported = components.stream()
+                                               .filter(c -> !c.type.streamable)
+                                               .collect(Collectors.toSet());
+        if (!unsupported.isEmpty())
+            throw new AssertionError(format("Unsupported streaming component detected %s", unsupported));

Review Comment:
   ```suggestion
               throw new AssertionError(format("Unsupported streaming components detected: %s", unsupported));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] adelapena commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "adelapena (via GitHub)" <gi...@apache.org>.
adelapena commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243493717


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -195,14 +197,15 @@ public void close()
             writer.close();
     }
 
-    public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
+    public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException
     {
-        logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+        SequentialWriter writer = componentWriters.get(component.name);

Review Comment:
   You can add `@SuppressWarnings("resource")` with a brief comment about where it will be closed. We have a bunch of those, for example [this one](https://github.com/apache/cassandra/blob/eda0bcbefaf38e4069ac6db3a00937d778090083/src/java/org/apache/cassandra/service/reads/range/RangeCommands.java#L55).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1244018336


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -195,14 +197,15 @@ public void close()
             writer.close();
     }
 
-    public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
+    public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException
     {
-        logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+        SequentialWriter writer = componentWriters.get(component.name);

Review Comment:
   If you want IDEA to stop complaining, use `@SuppressWarnings({ "resource", "RedundantSuppression" })`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243336002


##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    private static final int NUM_COMPONENTS;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        NUM_COMPONENTS = sstableStreamingComponentsCount()
+                         + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size()
+                         + V1OnDiskFormat.LITERAL_COMPONENTS.size();
+    }
+
+    private static int sstableStreamingComponentsCount() {
+        return (int) DatabaseDescriptor.getSelectedSSTableFormat()
+                                       .allComponents()
+                                       .stream()
+                                       .filter(c -> c.type.streamable)
+                                       .count() - 1;  // -1 because we don't include the compression component
+    }
+
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        test(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException
+    {
+        test(false);
+    }
+
+    private void test(boolean zeroCopyStreaming) throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> c.with(Feature.values())
+                                                             .set("stream_entire_sstables", zeroCopyStreaming).set("streaming_slow_events_log_timeout", "0s"))
+                                           .start()))
+        {
+            // streaming sends events every 65k, so need to make sure that the files are larger than this to hit
+            // all cases of the vtable

Review Comment:
   Yup, moving it makes sense. Moved to above BLOB.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243309480


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -61,12 +58,8 @@ public SSTableZeroCopyWriter(Builder<?, ?> builder,
         lifecycleNewTracker.trackNew(this);
         this.componentWriters = new HashMap<>();
 
-        if (!descriptor.getFormat().streamingComponents().containsAll(components))
-            throw new AssertionError(format("Unsupported streaming component detected %s",
-                                            Sets.difference(ImmutableSet.copyOf(components), descriptor.getFormat().streamingComponents())));
-

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243344973


##########
test/distributed/org/apache/cassandra/distributed/test/sai/IndexStreamingTest.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat;
+import org.assertj.core.api.Assertions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class IndexStreamingTest extends TestBaseImpl
+{
+    private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+    private static final int NUM_COMPONENTS;
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        NUM_COMPONENTS = sstableStreamingComponentsCount()
+                         + V1OnDiskFormat.PER_SSTABLE_COMPONENTS.size()
+                         + V1OnDiskFormat.LITERAL_COMPONENTS.size();
+    }
+
+    private static int sstableStreamingComponentsCount() {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1241798065


##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -59,7 +60,18 @@ public interface SSTableFormat<R extends SSTableReader, W extends SSTableWriter>
      */
     Set<Component> allComponents();
 
-    Set<Component> streamingComponents();
+    /**
+     * Returns the components that should be streamed to other nodes on repair / rebuild.
+     * This includes only the core SSTable components produced by this format.
+     * Custom components registered by e.g. secondary indexes are not included.
+     * Use {@link SSTableReader#getStreamingComponents()} for the list of all components including the custom ones.
+     */
+    default Set<Component> streamingComponents()

Review Comment:
   Looks like we need it for testing. I'll think of a way to get rid of it completely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1236596507


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -61,12 +58,8 @@ public SSTableZeroCopyWriter(Builder<?, ?> builder,
         lifecycleNewTracker.trackNew(this);
         this.componentWriters = new HashMap<>();
 
-        if (!descriptor.getFormat().streamingComponents().containsAll(components))
-            throw new AssertionError(format("Unsupported streaming component detected %s",
-                                            Sets.difference(ImmutableSet.copyOf(components), descriptor.getFormat().streamingComponents())));
-

Review Comment:
   Why do we want to artificially restrict the set of components streamed in the place that can stream any component?
   It's not like streaming supports only some kind of components and doesn't support some other types. It will copy anything we'll throw at it. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243522916


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -195,14 +197,15 @@ public void close()
             writer.close();
     }
 
-    public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
+    public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException
     {
-        logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+        SequentialWriter writer = componentWriters.get(component.name);

Review Comment:
   I'm not getting this error when building. And the project has built fine in CircleCI. 
   I tried adding `@SuppressWarnings("resource")` but intellij immediately marks it as redundant :shrug: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243343810


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -195,14 +197,15 @@ public void close()
             writer.close();
     }
 
-    public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
+    public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException
     {
-        logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+        SequentialWriter writer = componentWriters.get(component.name);

Review Comment:
   That looks like a false positive to me. It isn't supposed to be closed here. All writers are closed in the `close()` method. How do I suppress this?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] pkolaczk commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "pkolaczk (via GitHub)" <gi...@apache.org>.
pkolaczk commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1243582571


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -195,14 +197,15 @@ public void close()
             writer.close();
     }
 
-    public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
+    public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException
     {
-        logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+        SequentialWriter writer = componentWriters.get(component.name);

Review Comment:
   Ah! Gotcha. So that's the difference: JDK 8. I was using 11. Checking with JDK 8....



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on a diff in pull request #2420: Stream all components registered by an sstable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1240080694


##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -156,23 +168,23 @@ public static class Types
         {
             // the base data for an sstable: the remaining components can be regenerated
             // based on the data component
-            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", null);
+            public static final Component.Type DATA = Component.Type.createSingleton("DATA", "Data.db", true, null);
             // file to hold information about uncompressed data length, chunk offsets etc.
-            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", null);
+            public static final Component.Type COMPRESSION_INFO = Component.Type.createSingleton("COMPRESSION_INFO", "CompressionInfo.db", true, null);
             // statistical metadata about the content of the sstable
-            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", null);
+            public static final Component.Type STATS = Component.Type.createSingleton("STATS", "Statistics.db", true, null);
             // serialized bloom filter for the row keys in the sstable
-            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", null);
+            public static final Component.Type FILTER = Component.Type.createSingleton("FILTER", "Filter.db", true, null);
             // holds CRC32 checksum of the data file
-            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", null);
+            public static final Component.Type DIGEST = Component.Type.createSingleton("DIGEST", "Digest.crc32", true, null);
             // holds the CRC32 for chunks in an uncompressed file.
-            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", null);
+            public static final Component.Type CRC = Component.Type.createSingleton("CRC", "CRC.db", true, null);
             // table of contents, stores the list of all components for the sstable
-            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", null);
+            public static final Component.Type TOC = Component.Type.createSingleton("TOC", "TOC.txt", false, null);
             // built-in secondary index (may exist multiple per sstable)
-            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", null);
+            public static final Component.Type SECONDARY_INDEX = Component.Type.create("SECONDARY_INDEX", "SI_.*.db", true, null);
             // custom component, used by e.g. custom compaction strategy
-            public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, null);
+            public static final Component.Type CUSTOM = Component.Type.create("CUSTOM", null, true, null);

Review Comment:
   I guess not all CUSTOM components should be streamable?
   
   Is the plan here to just create something like the other `Types` inner classes, but in SAI's `V1OnDiskFormat`? If we did that, we could specify "streamability" there in those types. For example:
   
   `Component.Type TERMS_DATA = Component.Type.create("TermsData", "SAI\\+.*\\+TermsData.db", true, null);`
   
   This is really where all the stuff in `IndexComponent` probably should have gone in the first place.
   
   CC @mike-tr-adamson 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] maedhroz commented on pull request #2420: CASSANDRA-18345 Stream all components registered by an SSTable

Posted by "maedhroz (via GitHub)" <gi...@apache.org>.
maedhroz commented on PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#issuecomment-1613791228

   Committed as https://github.com/apache/cassandra/commit/5d3f257477cba2d7f33f842dba4582d0660f5738


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] adelapena commented on a diff in pull request #2420: CASSANDRA-18345 Stream all components registered by an SSTable

Posted by "adelapena (via GitHub)" <gi...@apache.org>.
adelapena commented on code in PR #2420:
URL: https://github.com/apache/cassandra/pull/2420#discussion_r1246399215


##########
src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java:
##########
@@ -195,14 +197,16 @@ public void close()
             writer.close();
     }
 
-    public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
+    public void writeComponent(Component component, DataInputPlus in, long size) throws ClosedChannelException
     {
-        logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
+        @SuppressWarnings({"resource", "RedundantSuppression"})  // all writers are closed in close()

Review Comment:
   I guess if that's the case, we could use `FBUtitilites#closeAll` at `SSTableZeroCopyWriter#close`. But we are not modifying that here, so I think it can be done on a separate ticket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org