You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2023/01/13 15:00:23 UTC

[GitHub] [cassandra] blambov commented on a diff in pull request #2064: CASSANDRA-17056: CEP-17

blambov commented on code in PR #2064:
URL: https://github.com/apache/cassandra/pull/2064#discussion_r1068100487


##########
src/java/org/apache/cassandra/db/compaction/Scrubber.java:
##########
@@ -564,20 +565,27 @@ public ScrubResult(Scrubber scrubber)
     /**
      * During 2.x migration, under some circumstances rows might have gotten duplicated.
      * Merging iterator merges rows with same clustering.
-     *
+     * <p>
      * For more details, refer to CASSANDRA-12144.
      */
-    private static class RowMergingSSTableIterator extends WrappingUnfilteredRowIterator
+    private static class RowMergingSSTableIterator extends UnmodifiableIterator<Unfiltered> implements WrappingUnfilteredRowIterator

Review Comment:
   Could we not implement `remove` in the interface so that `extends UnmodifiableIterator` is not necessary here and elsewhere?



##########
src/java/org/apache/cassandra/service/CacheService.java:
##########
@@ -439,13 +440,14 @@ public void serialize(KeyCacheKey key, DataOutputPlus out, ColumnFamilyStore cfs
                 out.writeInt(Integer.MIN_VALUE); // backwards compatibility for "int based generation only"
                 ByteBufferUtil.writeWithShortLength(key.desc.id.asBytes(), out);
             }
-            out.writeBoolean(true);
-
-            SerializationHeader header = new SerializationHeader(false, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS);
-            new RowIndexEntry.Serializer(key.desc.version, header).serializeForCache(entry, out);
+            // format type id is stored so that in case there is no sstable for the key we can figure out which
+            // serializer (of which sstable format) was used and thus as can use the right implemnentation to skip
+            // the unmatched entry
+            out.writeByte(key.desc.formatType.ordinal());

Review Comment:
   Would it be simpler to store a (varint) length here?



##########
src/java/org/apache/cassandra/io/sstable/AbstractSSTableIterator.java:
##########
@@ -382,31 +406,174 @@ public Unfiltered next()
         protected abstract boolean hasNextInternal() throws IOException;
         protected abstract Unfiltered nextInternal() throws IOException;
 
+        @Override
         public void close() throws IOException
         {
             if (shouldCloseFile && file != null)
                 file.close();
         }
+
+        @Override
+        public String toString()
+        {
+            return file != null ? file.toString() : "null";
+        }    }

Review Comment:
   Nit: formatting needs fixing.



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -36,6 +36,9 @@
     SSTableWriter.Factory getWriterFactory();
     SSTableReader.Factory getReaderFactory();
 
+    boolean isKeyCacheSupported();

Review Comment:
   I would call this `cachesKeys` (if the sstable impl takes care of doing the caching calls) or `cacheKeys`/`shouldCacheKeys` (if other code is doing it) -- the alternate implementation can support cached keys, but does not need to.



##########
src/java/org/apache/cassandra/schema/Schema.java:
##########
@@ -224,6 +225,15 @@ public ColumnFamilyStore getColumnFamilyStoreInstance(TableId id)
                : null;
     }
 
+    public ColumnFamilyStore getColumnFamilyStoreInstance(TableMetadata metadata)
+    {
+        ColumnFamilyStore cfs = getColumnFamilyStoreInstance(metadata.id);
+        if (cfs == null || !metadata.isIndex())

Review Comment:
   Shouldn't this be `!=`?



##########
src/java/org/apache/cassandra/utils/Throwables.java:
##########
@@ -189,13 +189,19 @@ public static Throwable perform(Throwable accumulate, String filePath, FileOpTyp
     }
 
     public static void closeAndAddSuppressed(@Nonnull Throwable t, AutoCloseable... closeables)
+    {
+        closeAndAddSuppressed(t, Arrays.asList(closeables));
+    }
+
+    public static void closeAndAddSuppressed(@Nonnull Throwable t, Iterable<AutoCloseable> closeables)
     {
         Preconditions.checkNotNull(t);
         for (AutoCloseable closeable : closeables)
         {
             try
             {
-                closeable.close();
+                if (closeable != null)

Review Comment:
   I'm not sure we want to relax this. Usually `FileUtils.close` is used to close things that may include nulls.



##########
src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java:
##########
@@ -125,6 +130,24 @@ public AbstractRowIndexEntry.KeyCacheValueSerializer<BigTableReader, RowIndexEnt
         return KeyCacheValueSerializer.instance;
     }
 
+    @Override
+    public BigTableReader cast(SSTableReader sstr)
+    {
+        return sstr == null ? null : (BigTableReader) sstr;

Review Comment:
   There is no need for the null check here. Nulls can be cast to any type.



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java:
##########
@@ -808,32 +809,12 @@ public void runOnClose(final Runnable runOnClose)
         synchronized (tidy.global)
         {
             final Runnable existing = tidy.runOnClose;
-            tidy.runOnClose = AndThen.get(existing, runOnClose);
-        }
-    }
-
-    private static class AndThen implements Runnable
-    {
-        final Runnable runFirst;
-        final Runnable runSecond;
-
-        private AndThen(Runnable runFirst, Runnable runSecond)
-        {
-            this.runFirst = runFirst;
-            this.runSecond = runSecond;
-        }
-
-        public void run()
-        {
-            runFirst.run();
-            runSecond.run();
-        }
-
-        static Runnable get(Runnable runFirst, Runnable runSecond)
-        {
-            if (runFirst == null)
-                return runSecond;
-            return new AndThen(runFirst, runSecond);
+            tidy.runOnClose = () -> {

Review Comment:
   AFAIR `AndThen` here is necessary because a lambda retains a reference to the reader, which we definitely don't want (see comment on top of the method).
   
   If you revert this change, please add a "do not replace with a lambda..." comment in front of `AndThen` itself.



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java:
##########
@@ -36,6 +39,8 @@
     SSTableWriter.Factory getWriterFactory();
     SSTableReader.Factory getReaderFactory();
 
+    Set<Component> supportedComponents();

Review Comment:
   As below, "supported" is not the right term here. Maybe "included"? (I believe we also have "required" for the mandatory ones?)



##########
src/java/org/apache/cassandra/schema/Schema.java:
##########
@@ -224,6 +225,15 @@ public ColumnFamilyStore getColumnFamilyStoreInstance(TableId id)
                : null;
     }
 
+    public ColumnFamilyStore getColumnFamilyStoreInstance(TableMetadata metadata)

Review Comment:
   The behaviour of this is different from id lookup. Could we give it a different name?



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java:
##########
@@ -1973,17 +1970,36 @@ public void run()
                     if (logger.isTraceEnabled())
                         logger.trace("Async instance tidier for {}, after barrier", descriptor);
 
-                    if (bf != null)
-                        bf.close();
-                    if (summary != null)
-                        summary.close();
-                    if (runOnClose != null)
+                    Throwable exceptions = null;
+                    if (runOnClose != null) try
+                    {
                         runOnClose.run();
-                    if (dfile != null)
-                        dfile.close();
-                    if (ifile != null)
-                        ifile.close();
-                    globalRef.release();
+                    }
+                    catch (RuntimeException | Error ex)
+                    {
+                        logger.error("Failed to run on-close listeners for sstable " + descriptor.baseFilename(), ex);
+                        exceptions = ex;
+                    }
+
+                    Throwable closeExceptions = Throwables.close(null, closeables);

Review Comment:
   Isn't `FileUtils.close` a better match for what is done here?



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java:
##########
@@ -907,43 +907,18 @@ public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable run
             // TODO: merge with caller's firstKeyBeyond() work,to save time
             if (newStart.compareTo(first) > 0)
             {
-                final long dataStart = getPosition(newStart, Operator.EQ);
-                final long indexStart = getIndexScanPosition(newStart);
-                this.tidy.runOnClose = new DropPageCache(dfile, dataStart, ifile, indexStart, runOnClose);
+                Map<FileHandle, Long> handleAndPositions = new LinkedHashMap<>(2);
+                if (dfile != null)
+                    handleAndPositions.put(dfile, getPosition(newStart, Operator.EQ));
+                if (ifile != null)
+                    handleAndPositions.put(ifile, getIndexScanPosition(newStart));
+                runOnClose(() -> handleAndPositions.forEach(FileHandle::dropPageCache));

Review Comment:
   The same comment as in `runOnClose` applies -- this lambda retains a reference to the reader.



##########
src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java:
##########
@@ -374,4 +380,20 @@ public ClusteringPrefix<?> getLowerBoundPrefixFromCache(DecoratedKey partitionKe
         }
 
     }
-}
+
+    @Override
+    public IScrubber getScrubber(LifecycleTransaction transaction, OutputHandler outputHandler, IScrubber.Options options)
+    {
+        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata());

Review Comment:
   Could we do this without referencing the higher-level CFS and Schema concepts in the sstable reader implementation?
   
   Perhaps implement it in the format and pass the CFS as an argument?



##########
src/java/org/apache/cassandra/db/compaction/Scrubber.java:
##########
@@ -742,6 +750,12 @@ public FixNegativeLocalDeletionTimeIterator(UnfilteredRowIterator iterator, Outp
             this.negativeLocalExpirationTimeMetrics = negativeLocalDeletionInfoMetrics;
         }
 
+        @Override
+        public UnfilteredRowIterator wrapped()
+        {
+            return iterator;
+        }
+

Review Comment:
   Should we not take advantage of the methods implemented by the wrapping interface, i.e. remove the implementations below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org