You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2021/08/31 15:40:10 UTC

[GitHub] [cassandra] adelapena commented on a change in pull request #735: Cassandra 16092: CEP-7 port Index Group for Storage Attached Index

adelapena commented on a change in pull request #735:
URL: https://github.com/apache/cassandra/pull/735#discussion_r699437056



##########
File path: src/java/org/apache/cassandra/db/CassandraTableWriteHandler.java
##########
@@ -33,10 +33,10 @@ public CassandraTableWriteHandler(ColumnFamilyStore cfs)
 
     @Override
     @SuppressWarnings("resource")
-    public void write(PartitionUpdate update, WriteContext context, UpdateTransaction updateTransaction)
+    public void write(PartitionUpdate update, WriteContext context, boolean updateIndexes)

Review comment:
       Nit: the import for `UpdateTransaction` is now unused

##########
File path: src/java/org/apache/cassandra/db/ColumnIndex.java
##########
@@ -123,14 +123,20 @@ public void buildRowIndex(UnfilteredRowIterator iterator) throws IOException
     private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException
     {
         ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
+
+        long partitionDeletionPosition = writer.position();
         DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
+        if (!observers.isEmpty())
+            observers.forEach((o) -> o.partitionLevelDeletion(iterator.partitionLevelDeletion(), partitionDeletionPosition));

Review comment:
       Nit: we can use the single argument of the lambda without parenthesis, here and in the other two calls:
   ```suggestion
               observers.forEach(o -> o.partitionLevelDeletion(iterator.partitionLevelDeletion(), partitionDeletionPosition));
   ```

##########
File path: src/java/org/apache/cassandra/index/SecondaryIndexManager.java
##########
@@ -1579,4 +1710,35 @@ public static void shutdownAndWait(long timeout, TimeUnit units) throws Interrup
         shutdown(asyncExecutor, blockingExecutor);
         awaitTermination(timeout, units, asyncExecutor, blockingExecutor);
     }
+
+    public void makeIndexNonQueryable(Index index, Index.Status status)
+    {
+        String name = index.getIndexMetadata().name;
+        if (indexes.get(name) == index)
+        {
+            if (!index.isQueryable(status))
+                queryableIndexes.remove(name);
+        }
+    }
+
+    public void makeIndexQueryable(Index index, Index.Status status)
+    {
+        String name = index.getIndexMetadata().name;
+        if (indexes.get(name) == index)
+        {
+            if (index.isQueryable(status))
+            {
+                if (queryableIndexes.add(name))
+                    logger.info("Index [{}] became queryable after successful build.", name);
+            }
+
+            if (writableIndexes.put(name, index) == null)
+                logger.info("Index [{}] became writable after successful build.", name);
+        }
+    }
+
+    private static String identifier(String keyspace, String index)
+    {
+        return new StringBuilder().append(keyspace).append('.').append(index).toString();
+    }

Review comment:
       This method doesn't seem to be used.

##########
File path: src/java/org/apache/cassandra/db/Keyspace.java
##########
@@ -633,7 +632,6 @@ else if (isDeferrable)
                     columnFamilyStores.get(tableId).metric.viewLockAcquireTime.update(acquireTime, MILLISECONDS);
             }
         }
-        int nowInSec = FBUtilities.nowInSeconds();

Review comment:
       Nit: this leaves an unused import of `FBUtilities`

##########
File path: src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
##########
@@ -40,16 +43,70 @@
     void startPartition(DecoratedKey key, long indexPosition);
 
     /**
-     * Called after the unfiltered cluster is written to the sstable.
-     * Will be preceded by a call to {@code startPartition(DecoratedKey, long)},
-     * and the cluster should be assumed to belong to that partition.
+     * Called when the deletion time of a partition is written to the sstable.
      *
-     * @param unfilteredCluster The unfiltered cluster being added to SSTable.
+     * Will be preceded by a call to {@link #startPartition(DecoratedKey, long)},
+     * and the deletion time should be assumed to belong to that partition.
+     *
+     * @param deletionTime the partition-level deletion time being written to the SSTable
+     * @param position the position of the written deletion time in the data file,
+     * as required by {@link SSTableReader#partitionLevelDeletionAt(long)}
+     */
+    void partitionLevelDeletion(DeletionTime deletionTime, long position);
+
+    /**
+     * Called when the static row of a partition is written to the sstable.
+     *
+     * Will be preceded by a call to {@link #startPartition(DecoratedKey, long)},
+     * and the static row should be assumed to belong to that partition.
+     *
+     * @param staticRow the static row being written to the SSTable
+     * @param position the position of the written static row in the data file,
+     * as required by {@link SSTableReader#staticRowAt(long, ColumnFilter)}
      */
-    void nextUnfilteredCluster(Unfiltered unfilteredCluster);
+    void staticRow(Row staticRow, long position);
+
+    /**
+     * Called after an unfiltered is written to the sstable.
+     *
+     * Will be preceded by a call to {@link #startPartition(DecoratedKey, long)},
+     * and the unfiltered should be assumed to belong to that partition.
+     *
+     * Implementations overriding {@link #nextUnfilteredCluster(Unfiltered, long)} shouldn't implement this method
+     * since only one of the two methods is required.
+     *
+     * @param unfiltered the unfiltered being written to the SSTable
+     */
+    default void nextUnfilteredCluster(Unfiltered unfiltered)
+    {
+    }
+
+    /**
+     * Called after an unfiltered is written to the sstable.
+     *
+     * Will be preceded by a call to {@link #startPartition(DecoratedKey, long)},
+     * and the unfiltered should be assumed to belong to that partition.
+     *
+     * Implementations overriding {@link #nextUnfilteredCluster(Unfiltered)} shouldn't implement this method
+     * since only one of the two methods is required.
+     *
+     * @param unfiltered the unfiltered being written to the SSTable
+     * @param position the position of the written unfiltered in the data file,
+     * as required by {@link SSTableReader#clusteringAt(long)}
+     * and {@link SSTableReader#unfilteredAt(long, ColumnFilter)}
+     */
+    default void nextUnfilteredCluster(Unfiltered unfiltered, long position)
+    {
+        nextUnfilteredCluster(unfiltered);
+    }
 
     /**
      * Called when all data is written to the file and it's ready to be finished up.
      */
     void complete();
+
+    /**
+     * Clean up resources on error. There should be no side effects if called multiple times.
+     */
+    default void abort(Throwable accumulator) {}

Review comment:
       We could annotate this method with `@SuppressWarnings("unused")`

##########
File path: src/java/org/apache/cassandra/db/ReadCommand.java
##########
@@ -389,16 +385,23 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
         long startTimeNanos = System.nanoTime();
 
         ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
-        Index index = getIndex(cfs);
+        Index.QueryPlan indexQueryPlan = indexQueryPlan();
 
         Index.Searcher searcher = null;
-        if (index != null)
+        if (indexQueryPlan != null)
         {
-            if (!cfs.indexManager.isIndexQueryable(index))
-                throw new IndexNotAvailableException(index);
-
-            searcher = index.searcherFor(this);
-            Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
+            cfs.indexManager.checkQueryability(indexQueryPlan);
+
+            Index index = indexQueryPlan.getFirst();

Review comment:
       I think this declaration is unneeded

##########
File path: src/java/org/apache/cassandra/index/Index.java
##########
@@ -593,10 +602,296 @@ default boolean supportsReplicaFilteringProtection(RowFilter rowFilter)
      */
     public interface Searcher
     {
+        /**
+         * Returns the {@link ReadCommand} for which this searcher has been created.
+         *
+         * @return the base read command
+         */
+        ReadCommand command();
+
         /**
          * @param executionController the collection of OpOrder.Groups which the ReadCommand is being performed under.
          * @return partitions from the base table matching the criteria of the search.
          */
         public UnfilteredPartitionIterator search(ReadExecutionController executionController);
+
+        /**
+         * Replica filtering protection may fetch data that doesn't match query conditions.
+         *
+         * On coordinator, we need to filter the replicas' responses again.
+         *
+         * @return filtered response that satisfied query conditions
+         */
+        default PartitionIterator filterReplicaFilteringProtection(PartitionIterator fullResponse)

Review comment:
       We might mention here that this won't be called if `QueryPlan#supportsReplicaFilteringProtection` returns `false`.

##########
File path: test/unit/org/apache/cassandra/index/StubIndexGroup.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.WriteContext;
+import org.apache.cassandra.db.compaction.OperationType;

Review comment:
       Nit: unused

##########
File path: src/java/org/apache/cassandra/index/internal/CassandraIndex.java
##########
@@ -271,14 +270,6 @@ public long getEstimatedResultRows()
         return indexCfs.getMeanRowCount();
     }
 
-    /**
-     * No post processing of query results, just return them unchanged
-     */
-    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)

Review comment:
       Nit: This leaved unused imports of `BiFunction` and `PartitionIterator`

##########
File path: src/java/org/apache/cassandra/db/Memtable.java
##########
@@ -306,8 +310,19 @@ public int partitionCount()
         return partitions.size();
     }
 
+    /**
+     * @return current {@link LifecycleNewTracker} used to flush this memtable
+     */
+    public LifecycleNewTracker tracker()

Review comment:
       This is `@Nullable`, isn't it?

##########
File path: src/java/org/apache/cassandra/index/SecondaryIndexManager.java
##########
@@ -1579,4 +1710,35 @@ public static void shutdownAndWait(long timeout, TimeUnit units) throws Interrup
         shutdown(asyncExecutor, blockingExecutor);
         awaitTermination(timeout, units, asyncExecutor, blockingExecutor);
     }
+
+    public void makeIndexNonQueryable(Index index, Index.Status status)

Review comment:
       Maybe we could make these two methods (`makeIndexQueryable` and `makeIndexNonQueryable`) private and inline the status argument, unless we plan to add other callers.

##########
File path: src/java/org/apache/cassandra/index/SecondaryIndexManager.java
##########
@@ -1164,23 +1207,50 @@ public void validate(PartitionUpdate update) throws InvalidRequestException
     /*
      * IndexRegistry methods
      */
-
-    public void registerIndex(Index index)
+    public void registerIndex(Index index, Object groupKey, Supplier<Index.Group> groupSupplier)
     {
         String name = index.getIndexMetadata().name;
         indexes.put(name, index);
         logger.trace("Registered index {}", name);
-    }
 
-    public void unregisterIndex(Index index)
-    {
-        unregisterIndex(index.getIndexMetadata().name);
+        // instantiate and add the index group if it hasn't been already added
+        Index.Group group = indexGroups.computeIfAbsent(groupKey, k -> groupSupplier.get());
+
+        // add the created index to its group if it is not a singleton group
+        if (!(group instanceof SingletonIndexGroup))
+        {
+            if (index.getBackingTable().isPresent())

Review comment:
       We could mention this limitation in the JavaDoc for `Index#getBackingTable`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org