You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/09/30 20:48:22 UTC
[04/16] cassandra git commit: Fix Mmapped File Boundaries
Fix Mmapped File Boundaries
This patch fixes two bugs with mmap segment boundary
tracking, and introduces automated correction of
this bug on startup
patch by benedict; reviewed by tjake for CASSANDRA-10357
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c37562e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c37562e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c37562e3
Branch: refs/heads/trunk
Commit: c37562e345c24720c55428a8644191df68319812
Parents: f6cab37
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Sep 16 18:09:32 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Sep 30 19:45:49 2015 +0100
----------------------------------------------------------------------
.../io/sstable/AbstractSSTableSimpleWriter.java | 2 +
.../cassandra/io/sstable/CQLSSTableWriter.java | 15 +
.../cassandra/io/sstable/SSTableReader.java | 34 +-
.../io/sstable/SSTableSimpleUnsortedWriter.java | 6 +
.../io/sstable/SSTableSimpleWriter.java | 12 +
.../cassandra/io/util/MappedFileDataInput.java | 8 +-
.../cassandra/io/util/MmappedSegmentedFile.java | 270 +++++++++++++---
.../apache/cassandra/io/util/SegmentedFile.java | 1 +
.../sstable/LongSegmentedFileBoundaryTest.java | 322 +++++++++++++++++++
9 files changed, 601 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 165a4b2..557c3de 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -200,4 +200,6 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable
protected abstract void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException;
protected abstract ColumnFamily getColumnFamily() throws IOException;
+
+ public abstract Descriptor getCurrentDescriptor();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index b211a90..c364171 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -271,6 +271,16 @@ public class CQLSSTableWriter implements Closeable
writer.close();
}
+ public Descriptor getCurrentDescriptor()
+ {
+ return writer.getCurrentDescriptor();
+ }
+
+ public CFMetaData getCFMetaData()
+ {
+ return writer.metadata;
+ }
+
/**
* A Builder for a CQLSSTableWriter object.
*/
@@ -366,6 +376,11 @@ public class CQLSSTableWriter implements Closeable
}
}
+ CFMetaData metadata()
+ {
+ return schema;
+ }
+
/**
* Adds the specified column family to the specified keyspace.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 0f307b0..84add6f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -17,13 +17,7 @@
*/
package org.apache.cassandra.io.sstable;
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
+import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
@@ -70,20 +64,14 @@ import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
-import org.apache.cassandra.io.util.BufferedSegmentedFile;
-import org.apache.cassandra.io.util.CompressedSegmentedFile;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.ICompressedFile;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.*;
import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.service.ActiveRepairService;
@@ -162,6 +150,7 @@ import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR
public class SSTableReader extends SSTable implements SelfRefCounted<SSTableReader>
{
private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
+ private static final int ACCURATE_BOUNDARIES_MAGIC_NUMBER = 248923458;
private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
static
@@ -892,6 +881,19 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
ibuilder.deserializeBounds(iStream);
dbuilder.deserializeBounds(iStream);
+
+ boolean checkForRepair = true;
+ try
+ {
+ int v = iStream.readInt();
+ // check for our magic number, indicating this summary has been sampled correctly
+ checkForRepair = v != ACCURATE_BOUNDARIES_MAGIC_NUMBER;
+ }
+ catch (Throwable t) {}
+
+ // fix CASSANDRA-10357 on-the-fly
+ if (checkForRepair && MmappedSegmentedFile.maybeRepair(metadata, descriptor, indexSummary, ibuilder, dbuilder))
+ saveSummary(ibuilder, dbuilder);
}
catch (IOException e)
{
@@ -992,6 +994,8 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
ByteBufferUtil.writeWithLength(last.getKey(), oStream);
ibuilder.serializeBounds(oStream);
dbuilder.serializeBounds(oStream);
+ // write a magic number, to indicate this summary has been sampled correctly
+ oStream.writeInt(ACCURATE_BOUNDARIES_MAGIC_NUMBER);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 9ee9ea1..25ec354 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -140,6 +140,12 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
return previous;
}
+ public Descriptor getCurrentDescriptor()
+ {
+ // can be implemented, but isn't necessary
+ throw new UnsupportedOperationException();
+ }
+
protected ColumnFamily createColumnFamily() throws IOException
{
return ArrayBackedSortedColumns.factory.create(metadata);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 87c8e33..23da501 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -65,6 +65,13 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
writer = getWriter();
}
+ SSTableReader closeAndOpenReader()
+ {
+ if (currentKey != null)
+ writeRow(currentKey, columnFamily);
+ return writer.closeAndOpenReader();
+ }
+
public void close()
{
try
@@ -89,4 +96,9 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
{
return ArrayBackedSortedColumns.factory.create(metadata);
}
+
+ public Descriptor getCurrentDescriptor()
+ {
+ return writer.descriptor;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
index d056240..f93ce72 100644
--- a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
@@ -51,12 +51,18 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn
public void seek(long pos) throws IOException
{
long inSegmentPos = pos - segmentOffset;
- if (inSegmentPos < 0 || inSegmentPos > buffer.capacity())
+ if (!contains(pos))
throw new IOException(String.format("Seek position %d is not within mmap segment (seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity()));
seekInternal((int) inSegmentPos);
}
+ public boolean contains(long pos)
+ {
+ long inSegmentPos = pos - segmentOffset;
+ return inSegmentPos >= 0 && inSegmentPos < buffer.capacity();
+ }
+
public long getFilePointer()
{
return segmentOffset + (long)position;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 1b23343..623f65a 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -24,11 +24,17 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexSummary;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.JVMStabilityInspector;
public class MmappedSegmentedFile extends SegmentedFile
@@ -135,52 +141,220 @@ public class MmappedSegmentedFile extends SegmentedFile
}
}
+ // see CASSANDRA-10357
+ public static boolean maybeRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ {
+ boolean mayNeedRepair = false;
+ if (ibuilder instanceof Builder)
+ mayNeedRepair = ((Builder) ibuilder).mayNeedRepair(descriptor.filenameFor(Component.PRIMARY_INDEX));
+ if (dbuilder instanceof Builder)
+ mayNeedRepair |= ((Builder) dbuilder).mayNeedRepair(descriptor.filenameFor(Component.DATA));
+
+ if (mayNeedRepair)
+ forceRepair(metadata, descriptor, indexSummary, ibuilder, dbuilder);
+ return mayNeedRepair;
+ }
+
+ // if one of the index/data files have boundaries larger than we can mmap, and they were written by a version that did not guarantee correct boundaries were saved,
+ // rebuild the boundaries and save them again
+ private static void forceRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ {
+ if (ibuilder instanceof Builder)
+ ((Builder) ibuilder).boundaries.clear();
+ if (dbuilder instanceof Builder)
+ ((Builder) dbuilder).boundaries.clear();
+
+ try (RandomAccessFile raf = new RandomAccessFile(descriptor.filenameFor(Component.PRIMARY_INDEX), "r");)
+ {
+ long iprev = 0, dprev = 0;
+ for (int i = 0; i < indexSummary.size(); i++)
+ {
+ // first read the position in the summary, and read the corresponding position in the data file
+ long icur = indexSummary.getPosition(i);
+ raf.seek(icur);
+ ByteBufferUtil.readWithShortLength(raf);
+ RowIndexEntry rie = metadata.comparator.rowIndexEntrySerializer().deserialize(raf, descriptor.version);
+ long dcur = rie.position;
+
+ // if these positions are small enough to map out a segment from the prior version (i.e. less than 2Gb),
+ // just add these as a boundary and proceed to the next index summary record; most scenarios will be
+ // served by this, keeping the cost of rebuild to a minimum.
+
+ if (Math.max(icur - iprev , dcur - dprev) > MAX_SEGMENT_SIZE)
+ {
+ // otherwise, loop over its index block, providing each RIE as a potential boundary for both files
+ raf.seek(iprev);
+ while (raf.getFilePointer() < icur)
+ {
+ // add the position of this record in the index file as an index file boundary
+ ibuilder.addPotentialBoundary(raf.getFilePointer());
+ // then read the RIE, and add its data file position as a boundary for the data file
+ ByteBufferUtil.readWithShortLength(raf);
+ rie = metadata.comparator.rowIndexEntrySerializer().deserialize(raf, descriptor.version);
+ dbuilder.addPotentialBoundary(rie.position);
+ }
+ }
+
+ ibuilder.addPotentialBoundary(icur);
+ dbuilder.addPotentialBoundary(dcur);
+
+ iprev = icur;
+ dprev = dcur;
+ }
+ }
+ catch (IOException e)
+ {
+ logger.error("Failed to recalculate boundaries for {}; mmap access may degrade to buffered for this file", descriptor);
+ }
+ }
+
/**
* Overrides the default behaviour to create segments of a maximum size.
*/
- static class Builder extends SegmentedFile.Builder
+ public static class Builder extends SegmentedFile.Builder
{
- // planned segment boundaries
- private List<Long> boundaries;
+ @VisibleForTesting
+ public static class Boundaries
+ {
+ private long[] boundaries;
+
+ // number of boundaries we have "fixed" (i.e. have determined the final value of)
+ private int fixedCount;
+
+ public Boundaries()
+ {
+ // we always have a boundary of zero, so we start with a fixedCount of 1
+ this(new long[8], 1);
+ }
+
+ public Boundaries(long[] boundaries, int fixedCount)
+ {
+ init(boundaries, fixedCount);
+ }
+
+ void init(long[] boundaries, int fixedCount)
+ {
+ this.boundaries = boundaries;
+ this.fixedCount = fixedCount;
+ }
+
+ public void addCandidate(long candidate)
+ {
+ // we make sure we have room before adding another element, so that we can share the addCandidate logic statically
+ boundaries = ensureCapacity(boundaries, fixedCount);
+ fixedCount = addCandidate(boundaries, fixedCount, candidate);
+ }
+
+ private static int addCandidate(long[] boundaries, int fixedCount, long candidate)
+ {
+ // check how far we are from the last fixed boundary
+ long delta = candidate - boundaries[fixedCount - 1];
+ assert delta >= 0;
+ if (delta != 0)
+ {
+ if (delta <= MAX_SEGMENT_SIZE)
+ // overwrite the unfixed (potential) boundary if the resultant segment would still be mmappable
+ boundaries[fixedCount] = candidate;
+ else if (boundaries[fixedCount] == 0)
+ // or, if it is not initialised, we cannot make an mmapped segment here, so this is the fixed boundary
+ boundaries[fixedCount++] = candidate;
+ else
+ // otherwise, fix the prior boundary and initialise our unfixed boundary
+ boundaries[++fixedCount] = candidate;
+ }
+ return fixedCount;
+ }
+
+ // ensures there is room for another fixed boundary AND an unfixed candidate boundary, i.e. fixedCount + 2 items
+ private static long[] ensureCapacity(long[] boundaries, int fixedCount)
+ {
+ if (fixedCount + 1 >= boundaries.length)
+ return Arrays.copyOf(boundaries, boundaries.length * 2);
+ return boundaries;
+ }
+
+ void clear()
+ {
+ fixedCount = 1;
+ Arrays.fill(boundaries, 0);
+ }
+
+ // returns the fixed boundaries, truncated to a correctly sized long[]
+ public long[] truncate()
+ {
+ return Arrays.copyOf(boundaries, fixedCount);
+ }
- // offset of the open segment (first segment begins at 0).
- private long currentStart = 0;
+ // returns the finished boundaries for the provided length, truncated to a correctly sized long[]
+ public long[] finish(long length, boolean isFinal)
+ {
+ assert length > 0;
+ // ensure there's room for the length to be added
+ boundaries = ensureCapacity(boundaries, fixedCount);
+
+ // clone our current contents, so we don't corrupt them
+ int fixedCount = this.fixedCount;
+ long[] boundaries = this.boundaries.clone();
+
+ // if we're finishing early, our length may be before some of our boundaries,
+ // so walk backwards until our boundaries are <= length
+ while (boundaries[fixedCount - 1] >= length)
+ boundaries[fixedCount--] = 0;
+ if (boundaries[fixedCount] >= length)
+ boundaries[fixedCount] = 0;
+
+ // add our length as a boundary
+ fixedCount = addCandidate(boundaries, fixedCount, length);
+
+ // if we have any unfixed boundary at the end, it's now fixed, since we're done
+ if (boundaries[fixedCount] != 0)
+ fixedCount++;
+
+ boundaries = Arrays.copyOf(boundaries, fixedCount);
+ if (isFinal)
+ {
+ // if this is the final one, save it
+ this.boundaries = boundaries;
+ this.fixedCount = fixedCount;
+ }
+ return boundaries;
+ }
+ }
- // current length of the open segment.
- // used to allow merging multiple too-large-to-mmap segments, into a single buffered segment.
- private long currentSize = 0;
+ private final Boundaries boundaries = new Boundaries();
public Builder()
{
super();
- boundaries = new ArrayList<>();
- boundaries.add(0L);
}
- public void addPotentialBoundary(long boundary)
+ public long[] boundaries()
{
- if (boundary - currentStart <= MAX_SEGMENT_SIZE)
- {
- // boundary fits into current segment: expand it
- currentSize = boundary - currentStart;
- return;
- }
+ return boundaries.truncate();
+ }
- // close the current segment to try and make room for the boundary
- if (currentSize > 0)
- {
- currentStart += currentSize;
- boundaries.add(currentStart);
- }
- currentSize = boundary - currentStart;
+ // indicates if we may need to repair the mmapped file boundaries. this is a cheap check to see if there
+ // are any spans larger than an mmap segment size, which should be rare to occur in practice.
+ boolean mayNeedRepair(String path)
+ {
+ // old boundaries were created without the length, so add it as a candidate
+ long length = new File(path).length();
+ boundaries.addCandidate(length);
+ long[] boundaries = this.boundaries.truncate();
- // if we couldn't make room, the boundary needs its own segment
- if (currentSize > MAX_SEGMENT_SIZE)
+ long prev = 0;
+ for (long boundary : boundaries)
{
- currentStart = boundary;
- boundaries.add(currentStart);
- currentSize = 0;
+ if (boundary - prev > MAX_SEGMENT_SIZE)
+ return true;
+ prev = boundary;
}
+ return false;
+ }
+
+ public void addPotentialBoundary(long boundary)
+ {
+ boundaries.addCandidate(boundary);
}
public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
@@ -188,10 +362,10 @@ public class MmappedSegmentedFile extends SegmentedFile
assert !isFinal || overrideLength <= 0;
long length = overrideLength > 0 ? overrideLength : new File(path).length();
// create the segments
- return new MmappedSegmentedFile(path, length, createSegments(path, length));
+ return new MmappedSegmentedFile(path, length, createSegments(path, length, isFinal));
}
- private Segment[] createSegments(String path, long length)
+ private Segment[] createSegments(String path, long length, boolean isFinal)
{
RandomAccessFile raf;
try
@@ -203,27 +377,17 @@ public class MmappedSegmentedFile extends SegmentedFile
throw new RuntimeException(e);
}
- // if we're early finishing a range that doesn't span multiple segments, but the finished file now does,
- // we remove these from the end (we loop incase somehow this spans multiple segments, but that would
- // be a loco dataset
- while (length < boundaries.get(boundaries.size() - 1))
- boundaries.remove(boundaries.size() -1);
-
- // add a sentinel value == length
- List<Long> boundaries = new ArrayList<>(this.boundaries);
- if (length != boundaries.get(boundaries.size() - 1))
- boundaries.add(length);
-
+ long[] boundaries = this.boundaries.finish(length, isFinal);
- int segcount = boundaries.size() - 1;
+ int segcount = boundaries.length - 1;
Segment[] segments = new Segment[segcount];
try
{
for (int i = 0; i < segcount; i++)
{
- long start = boundaries.get(i);
- long size = boundaries.get(i + 1) - start;
+ long start = boundaries[i];
+ long size = boundaries[i + 1] - start;
MappedByteBuffer segment = size <= MAX_SEGMENT_SIZE
? raf.getChannel().map(FileChannel.MapMode.READ_ONLY, start, size)
: null;
@@ -245,9 +409,10 @@ public class MmappedSegmentedFile extends SegmentedFile
public void serializeBounds(DataOutput out) throws IOException
{
super.serializeBounds(out);
- out.writeInt(boundaries.size());
- for (long position: boundaries)
- out.writeLong(position);
+ long[] boundaries = this.boundaries.truncate();
+ out.writeInt(boundaries.length);
+ for (long boundary : boundaries)
+ out.writeLong(boundary);
}
@Override
@@ -256,12 +421,11 @@ public class MmappedSegmentedFile extends SegmentedFile
super.deserializeBounds(in);
int size = in.readInt();
- List<Long> temp = new ArrayList<>(size);
-
+ long[] boundaries = new long[size];
for (int i = 0; i < size; i++)
- temp.add(in.readLong());
+ boundaries[i] = in.readLong();
- boundaries = temp;
+ this.boundaries.init(boundaries, size);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index c65ecbf..23454bc 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.util.Iterator;
+import java.util.List;
import java.util.NoSuchElementException;
import com.google.common.util.concurrent.RateLimiter;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java b/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java
new file mode 100644
index 0000000..e17c6a7
--- /dev/null
+++ b/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java
@@ -0,0 +1,322 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.sstable;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import com.google.common.io.Files;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.MmappedSegmentedFile;
+import org.apache.cassandra.io.util.MmappedSegmentedFile.Builder.Boundaries;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class LongSegmentedFileBoundaryTest
+{
+ @BeforeClass
+ public static void setup() throws Exception
+ {
+ SchemaLoader.cleanupAndLeaveDirs();
+ Keyspace.setInitialized();
+ StorageService.instance.initServer();
+ }
+
+ @AfterClass
+ public static void tearDown()
+ {
+ Config.setClientMode(false);
+ }
+
+ @Test
+ public void testRandomBoundaries()
+ {
+ long[] candidates = new long[1 + (1 << 16)];
+ int[] indexesToCheck = new int[1 << 8];
+ Random random = new Random();
+
+ for (int run = 0; run < 100; run++)
+ {
+
+ long seed = random.nextLong();
+ random.setSeed(seed);
+ System.out.println("Seed: " + seed);
+
+ // at least 1Ki, and as many as 256Ki, boundaries
+ int candidateCount = (1 + random.nextInt(candidates.length >> 10)) << 10;
+ generateBoundaries(random, candidateCount, candidates, indexesToCheck);
+
+ Boundaries builder = new Boundaries();
+ int nextIndexToCheck = indexesToCheck[0];
+ int checkCount = 0;
+ System.out.printf("[0..%d)", candidateCount);
+ for (int i = 1; i < candidateCount - 1; i++)
+ {
+ if (i == nextIndexToCheck)
+ {
+ if (checkCount % 20 == 0)
+ System.out.printf(" %d", i);
+ // grow number of samples logarithmically; work will still increase superlinearly, as size of dataset grows linearly
+ int sampleCount = 1 << (31 - Integer.numberOfLeadingZeros(++checkCount));
+ checkBoundarySample(random, candidates, i, sampleCount, builder);
+ // select out next index to check (there may be dups, so skip them)
+ while ((nextIndexToCheck = checkCount == indexesToCheck.length ? candidateCount : indexesToCheck[checkCount]) == i)
+ checkCount++;
+ }
+
+ builder.addCandidate(candidates[i]);
+ }
+ System.out.println();
+ checkBoundaries(candidates, candidateCount - 1, builder, candidates[candidateCount - 1]);
+ Assert.assertEquals(candidateCount, nextIndexToCheck);
+ }
+ }
+
+ private static void generateBoundaries(Random random, int candidateCount, long[] candidates, int[] indexesToCheck)
+ {
+ // average averageBoundarySize is 4MiB, max 4GiB, min 4KiB
+ long averageBoundarySize = (4L << 10) * random.nextInt(1 << 20);
+ long prev = 0;
+ for (int i = 1 ; i < candidateCount ; i++)
+ candidates[i] = prev += Math.max(1, averageBoundarySize + (random.nextGaussian() * averageBoundarySize));
+
+ // generate indexes we will corroborate our behaviour on
+ for (int i = 0 ; i < indexesToCheck.length ; i++)
+ indexesToCheck[i] = 1 + random.nextInt(candidateCount - 2);
+ Arrays.sort(indexesToCheck);
+ }
+
+ private static void checkBoundarySample(Random random, long[] candidates, int candidateCount, int sampleCount, Boundaries builder)
+ {
+ for (int i = 0 ; i < sampleCount ; i++)
+ {
+ // pick a number exponentially less likely to be near the beginning, since we test that area earlier
+ int position = 0 ;
+ while (position <= 0)
+ position = candidateCount / (Integer.lowestOneBit(random.nextInt()));
+ long upperBound = candidates[position];
+ long lowerBound = random.nextBoolean() ? (rand(random, 0, upperBound) / (Integer.lowestOneBit(random.nextInt())))
+ : candidates[Math.max(0, position - random.nextInt(64))];
+ long length = rand(random, lowerBound, upperBound);
+ checkBoundaries(candidates, candidateCount, builder, length);
+ }
+ checkBoundaries(candidates, candidateCount, builder, candidates[candidateCount]);
+ }
+
+ private static long rand(Random random, long lowerBound, long upperBound)
+ {
+ if (upperBound == lowerBound)
+ return upperBound;
+ return lowerBound + ((random.nextLong() & Long.MAX_VALUE) % (upperBound - lowerBound));
+ }
+
+ private static void checkBoundaries(long[] candidates, int candidateCount, Boundaries builder, long length)
+ {
+ if (length == 0)
+ return;
+
+ long[] boundaries = new long[(int) (10 + 2 * (length / Integer.MAX_VALUE))];
+ int count = 1;
+ int prev = 0;
+ while (true)
+ {
+ int p = candidates[prev + 1] - boundaries[count - 1] >= Integer.MAX_VALUE
+ ? prev + 1
+ : Arrays.binarySearch(candidates, prev, candidateCount, boundaries[count - 1] + Integer.MAX_VALUE);
+ if (p < 0) p = -2 -p;
+ if (p >= candidateCount - 1 || candidates[p] >= length)
+ break;
+ boundaries[count++] = candidates[p];
+ if (candidates[p + 1] >= length)
+ break;
+ prev = p;
+ }
+ if (candidates[candidateCount - 1] < length && length - boundaries[count - 1] >= Integer.MAX_VALUE)
+ boundaries[count++] = candidates[candidateCount - 1];
+ boundaries[count++] = length;
+ final long[] canon = Arrays.copyOf(boundaries, count);
+ final long[] check = builder.finish(length, false);
+ if (!Arrays.equals(canon, check))
+ Assert.assertTrue("\n" + Arrays.toString(canon) + "\n" + Arrays.toString(check), Arrays.equals(canon, check));
+ }
+
+ @Test
+ public void testBoundariesAndRepairSmall() throws InvalidRequestException, IOException
+ {
+ testBoundariesAndRepair(1, 1 << 16);
+ }
+
+ @Test
+ public void testBoundariesAndRepairMedium() throws InvalidRequestException, IOException
+ {
+ testBoundariesAndRepair(1, 1 << 20);
+ }
+
+ @Test
+ public void testBoundariesAndRepairLarge() throws InvalidRequestException, IOException
+ {
+ testBoundariesAndRepair(1, 100 << 20);
+ }
+
+ @Test
+ public void testBoundariesAndRepairHuge() throws InvalidRequestException, IOException
+ {
+ testBoundariesAndRepair(1, Integer.MAX_VALUE - 1024);
+ }
+
+ @Test
+ public void testBoundariesAndRepairTooHuge() throws InvalidRequestException, IOException
+ {
+ testBoundariesAndRepair(1, Integer.MAX_VALUE);
+ }
+
+ @Test
+ public void testBoundariesAndRepairHugeIndex() throws InvalidRequestException, IOException
+ {
+ testBoundariesAndRepair(1 << 7, 1 << 15);
+ }
+
+ @Test
+ public void testBoundariesAndRepairReallyHugeIndex() throws InvalidRequestException, IOException
+ {
+ testBoundariesAndRepair(1 << 14, 1 << 15);
+ }
+
+ private void testBoundariesAndRepair(int rows, int rowSize) throws InvalidRequestException, IOException
+ {
+ String KS = "cql_keyspace";
+ String TABLE = "table1";
+
+ File tempdir = Files.createTempDir();
+ try
+ {
+ Assert.assertTrue(DatabaseDescriptor.getColumnIndexSize() < rowSize);
+ Assert.assertTrue(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap);
+ Assert.assertTrue(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap);
+ Assert.assertTrue(StorageService.getPartitioner() instanceof ByteOrderedPartitioner);
+ File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
+ Assert.assertTrue(dataDir.mkdirs());
+
+ String schema = "CREATE TABLE cql_keyspace.table" + (rows > 1 ? "2" : "1") + " (k bigint, v1 blob, v2 blob, v3 blob, v4 blob, v5 blob, PRIMARY KEY (k" + (rows > 1 ? ", v1" : "") + ")) WITH compression = { 'sstable_compression':'' };";
+ String insert = "INSERT INTO cql_keyspace.table" + (rows > 1 ? "2" : "1") + " (k, v1, v2, v3, v4, v5) VALUES (?, ?, ?, ?, ?, ?)";
+
+ CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
+ .inDirectory(dataDir)
+ .forTable(schema)
+ .withPartitioner(StorageService.getPartitioner())
+ .using(insert)
+ .sorted();
+ CQLSSTableWriter writer = builder.build();
+
+ // write 8Gb of decorated keys
+ ByteBuffer[] value = new ByteBuffer[rows];
+ for (int row = 0 ; row < rows ; row++)
+ {
+ // if we're using clustering columns, the clustering key is replicated across every other column
+ value[row] = ByteBuffer.allocate(rowSize / (rows > 1 ? 8 : 5));
+ value[row].putInt(0, row);
+ }
+ long targetSize = 8L << 30;
+ long dk = 0;
+ long size = 0;
+ long dkSize = rowSize * rows;
+ while (size < targetSize)
+ {
+ for (int row = 0 ; row < rows ; row++)
+ writer.addRow(dk, value[row], value[row], value[row], value[row], value[row]);
+ size += dkSize;
+ dk++;
+ }
+
+ Descriptor descriptor = writer.getCurrentDescriptor().asType(Descriptor.Type.FINAL);
+ writer.close();
+
+ // open (and close) the reader so that the summary file is created
+ SSTableReader reader = SSTableReader.open(descriptor);
+ reader.selfRef().release();
+
+ // then check the boundaries are reasonable, and corrupt them
+ checkThenCorruptBoundaries(descriptor, rows * rowSize < Integer.MAX_VALUE);
+
+ // then check that reopening corrects the corruption
+ reader = SSTableReader.open(descriptor);
+ reader.selfRef().release();
+ checkThenCorruptBoundaries(descriptor, rows * rowSize < Integer.MAX_VALUE);
+ }
+ finally
+ {
+ FileUtils.deleteRecursive(tempdir);
+ }
+ }
+
+ private static void checkThenCorruptBoundaries(Descriptor descriptor, boolean expectDataMmappable) throws IOException
+ {
+ File summaryFile = new File(descriptor.filenameFor(Component.SUMMARY));
+ DataInputStream iStream = new DataInputStream(new FileInputStream(summaryFile));
+ IndexSummary indexSummary = IndexSummary.serializer.deserialize(iStream, StorageService.getPartitioner(), true, CFMetaData.DEFAULT_MIN_INDEX_INTERVAL, CFMetaData.DEFAULT_MAX_INDEX_INTERVAL);
+ ByteBuffer first = ByteBufferUtil.readWithLength(iStream);
+ ByteBuffer last = ByteBufferUtil.readWithLength(iStream);
+ MmappedSegmentedFile.Builder ibuilder = new MmappedSegmentedFile.Builder();
+ MmappedSegmentedFile.Builder dbuilder = new MmappedSegmentedFile.Builder();
+ ibuilder.deserializeBounds(iStream);
+ dbuilder.deserializeBounds(iStream);
+ iStream.close();
+ // index file cannot generally be non-mmappable, as index entries cannot be larger than MAX_SEGMENT_SIZE (due to promotedSize being encoded as an int)
+ assertBoundaries(descriptor.filenameFor(Component.PRIMARY_INDEX), true, ibuilder.boundaries());
+ assertBoundaries(descriptor.filenameFor(Component.DATA), expectDataMmappable, dbuilder.boundaries());
+
+ DataOutputStreamPlus oStream = new DataOutputStreamPlus(new FileOutputStream(summaryFile));
+ IndexSummary.serializer.serialize(indexSummary, oStream, true);
+ ByteBufferUtil.writeWithLength(first, oStream);
+ ByteBufferUtil.writeWithLength(last, oStream);
+ oStream.writeInt(1);
+ oStream.writeLong(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length());
+ oStream.writeLong(new File(descriptor.filenameFor(Component.DATA)).length());
+ oStream.close();
+ }
+
+ private static void assertBoundaries(String path, boolean expectMmappable, long[] boundaries)
+ {
+ long length = new File(path).length();
+ long prev = boundaries[0];
+ for (int i = 1 ; i <= boundaries.length && prev < length ; i++)
+ {
+ long boundary = i == boundaries.length ? length : boundaries[i];
+ Assert.assertEquals(String.format("[%d, %d), %d of %d", boundary, prev, i, boundaries.length),
+ expectMmappable, boundary - prev <= Integer.MAX_VALUE);
+ prev = boundary;
+ }
+ }
+
+}