You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/09/02 04:57:17 UTC

[GitHub] [cassandra] maedhroz commented on a change in pull request #642: Cassandra 15861 lock trunk

maedhroz commented on a change in pull request #642:
URL: https://github.com/apache/cassandra/pull/642#discussion_r481692234



##########
File path: src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java
##########
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format;
+
+import org.apache.cassandra.cache.ChunkCache;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.DiskOptimizationStrategy;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public abstract class SSTableReaderBuilder
+{
+    private static final Logger logger = LoggerFactory.getLogger(SSTableReaderBuilder.class);
+
+    protected final SSTableReader.Factory readerFactory;
+    protected final Descriptor descriptor;
+    protected final TableMetadataRef metadataRef;
+    protected final TableMetadata metadata;
+    protected final long maxDataAge;
+    protected final Set<Component> components;
+    protected final StatsMetadata statsMetadata;
+    protected final SSTableReader.OpenReason openReason;
+    protected final SerializationHeader header;
+
+    protected IndexSummary summary;
+    protected DecoratedKey first;
+    protected DecoratedKey last;
+    protected IFilter bf;
+    protected FileHandle ifile;
+    protected FileHandle dfile;
+
+    public SSTableReaderBuilder(Descriptor descriptor,
+                                TableMetadataRef metadataRef,
+                                long maxDataAge,
+                                Set<Component> components,
+                                StatsMetadata statsMetadata,
+                                SSTableReader.OpenReason openReason,
+                                SerializationHeader header)
+    {
+        this.descriptor = descriptor;
+        this.metadataRef = metadataRef;
+        this.metadata = metadataRef.get();
+        this.maxDataAge = maxDataAge;
+        this.components = components;
+        this.statsMetadata = statsMetadata;
+        this.openReason = openReason;
+        this.header = header;
+        this.readerFactory = descriptor.getFormat().getReaderFactory();
+    }
+
+    public abstract SSTableReader build();
+
+    public SSTableReaderBuilder dfile(FileHandle dfile)
+    {
+        this.dfile = dfile;
+        return this;
+    }
+
+    public SSTableReaderBuilder ifile(FileHandle ifile)
+    {
+        this.ifile = ifile;
+        return this;
+    }
+
+    public SSTableReaderBuilder bf(IFilter bf)
+    {
+        this.bf = bf;
+        return this;
+    }
+
+    public SSTableReaderBuilder summary(IndexSummary summary)
+    {
+        this.summary = summary;
+        return this;
+    }
+
+    /**
+     * Load index summary from Summary.db file if it exists.
+     *
+     * if loaded index summary has different index interval from current value stored in schema,
+     * then Summary.db file will be deleted and this returns false to rebuild summary.
+     */
+    void loadSummary()
+    {
+        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
+        if (!summariesFile.exists())
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("SSTable Summary File {} does not exist", summariesFile.getAbsolutePath());
+            return;
+        }
+
+        DataInputStream iStream = null;
+        try
+        {
+            iStream = new DataInputStream(Files.newInputStream(summariesFile.toPath()));
+            summary = IndexSummary.serializer.deserialize(iStream,
+                                                          metadata.partitioner,
+                                                          metadata.params.minIndexInterval,
+                                                          metadata.params.maxIndexInterval);
+            first = metadata.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+            last = metadata.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+        }
+        catch (IOException e)
+        {
+            if (summary != null)
+                summary.close();
+            logger.trace("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
+            // corrupted; delete it and fall back to creating a new summary
+            FileUtils.closeQuietly(iStream);
+            // delete it and fall back to creating a new summary
+            FileUtils.deleteWithConfirm(summariesFile);
+        }
+        finally
+        {
+            FileUtils.closeQuietly(iStream);
+        }
+    }
+
+    /**
+     * Build index summary(and optionally bloom filter) by reading through Index.db file.
+     *
+     * @param recreateBloomFilter true if recreate bloom filter
+     * @param summaryLoaded true if index summary is already loaded and not need to build again
+     */
+    void buildSummary(boolean recreateBloomFilter,
+                      boolean summaryLoaded,
+                      Set<Component> components,
+                      StatsMetadata statsMetadata) throws IOException
+    {
+        if (!components.contains(Component.PRIMARY_INDEX))
+            return;
+
+        if (logger.isDebugEnabled())
+            logger.debug("Attempting to build summary for {}", descriptor);
+
+
+        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+        try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))))
+        {
+            long indexSize = primaryIndex.length();
+            long histogramCount = statsMetadata.estimatedPartitionSize.count();
+            long estimatedKeys = histogramCount > 0 && !statsMetadata.estimatedPartitionSize.isOverflowed()
+                                 ? histogramCount
+                                 : SSTable.estimateRowsFromIndex(primaryIndex, descriptor); // statistics is supposed to be optional
+
+            if (recreateBloomFilter)
+                bf = FilterFactory.getFilter(estimatedKeys, metadata.params.bloomFilterFpChance);
+
+            try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL))
+            {
+                long indexPosition;
+
+                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+                {
+                    ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
+                    RowIndexEntry.Serializer.skip(primaryIndex, descriptor.version);
+                    DecoratedKey decoratedKey = metadata.partitioner.decorateKey(key);
+                    if (first == null)
+                        first = decoratedKey;
+                    last = decoratedKey;
+
+                    if (recreateBloomFilter)
+                        bf.add(decoratedKey);
+
+                    // if summary was already read from disk we don't want to re-populate it using primary index
+                    if (!summaryLoaded)
+                    {
+                        summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
+                    }
+                }
+
+                if (!summaryLoaded)
+                    summary = summaryBuilder.build(metadata.partitioner);
+            }
+        }
+
+        first = SSTable.getMinimalKey(first);
+        last = SSTable.getMinimalKey(last);
+    }
+
+    /**
+     * Load bloom filter from Filter.db file.
+     *
+     * @throws IOException
+     */
+    IFilter loadBloomFilter() throws IOException
+    {
+        try (DataInputStream stream = new DataInputStream(new BufferedInputStream(Files.newInputStream(Paths.get(descriptor.filenameFor(Component.FILTER))))))
+        {
+            return BloomFilterSerializer.deserialize(stream, descriptor.version.hasOldBfFormat());
+        }
+    }
+
+    public static class ForWriter extends SSTableReaderBuilder
+    {
+        public ForWriter(Descriptor descriptor,
+                         TableMetadataRef metadataRef,
+                         long maxDataAge,
+                         Set<Component> components,
+                         StatsMetadata statsMetadata,
+                         SSTableReader.OpenReason openReason,
+                         SerializationHeader header)
+        {
+            super(descriptor, metadataRef, maxDataAge, components, statsMetadata, openReason, header);
+        }
+
+        @Override
+        public SSTableReader build()
+        {
+            SSTableReader reader = readerFactory.open(this);
+
+            reader.setup(true);
+            return reader;
+        }
+    }
+
+    public static class ForBatch extends SSTableReaderBuilder
+    {
+        public ForBatch(Descriptor descriptor,
+                        TableMetadataRef metadataRef,
+                        Set<Component> components,
+                        StatsMetadata statsMetadata,
+                        SerializationHeader header)
+        {
+            super(descriptor, metadataRef, System.currentTimeMillis(), components, statsMetadata, SSTableReader.OpenReason.NORMAL, header);
+        }
+
+        @Override
+        public SSTableReader build()
+        {
+            String dataFilePath = descriptor.filenameFor(Component.DATA);
+            long fileLength = new File(dataFilePath).length();
+            logger.info("Opening {} ({})", descriptor, FBUtilities.prettyPrintMemory(fileLength));
+
+            initSummary(dataFilePath, components, statsMetadata);
+
+            boolean compression = components.contains(Component.COMPRESSION_INFO);
+            try (FileHandle.Builder ibuilder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
+                    .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+                    .withChunkCache(ChunkCache.instance);
+                    FileHandle.Builder dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
+                                                                                                                .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
+                                                                                                                .withChunkCache(ChunkCache.instance))
+            {
+                long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+                DiskOptimizationStrategy optimizationStrategy = DatabaseDescriptor.getDiskOptimizationStrategy();
+                int dataBufferSize = optimizationStrategy.bufferSize(statsMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+                int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / summary.size());
+                ifile = ibuilder.bufferSize(indexBufferSize).complete();
+                dfile = dbuilder.bufferSize(dataBufferSize).complete();
+                bf = FilterFactory.AlwaysPresent;
+
+                SSTableReader sstable = readerFactory.open(this);
+
+                sstable.first = first;

Review comment:
       @dcapwell Yeah, we had some back and forth on this in one of the resolved comments above and ended in pretty much the same place.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org