You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/04/23 16:29:43 UTC
[2/4] Preemptive open of compaction results
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 9d7729b..8cd8c9f 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -17,14 +17,29 @@
*/
package org.apache.cassandra.io.compress;
-import java.io.*;
-import java.util.*;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
-import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.cache.RefCountedMemory;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -45,6 +60,7 @@ public class CompressionMetadata
public final long compressedFileLength;
public final boolean hasPostCompressionAdlerChecksums;
private final Memory chunkOffsets;
+ private final long chunkOffsetsSize;
public final String indexFilePath;
public final CompressionParameters parameters;
@@ -85,7 +101,7 @@ public class CompressionMetadata
{
String compressorName = stream.readUTF();
int optionCount = stream.readInt();
- Map<String, String> options = new HashMap<String, String>();
+ Map<String, String> options = new HashMap<>();
for (int i = 0; i < optionCount; ++i)
{
String key = stream.readUTF();
@@ -114,6 +130,19 @@ public class CompressionMetadata
{
FileUtils.closeQuietly(stream);
}
+ this.chunkOffsetsSize = chunkOffsets.size();
+ }
+
+ private CompressionMetadata(String filePath, CompressionParameters parameters, RefCountedMemory offsets, long offsetsSize, long dataLength, long compressedLength, boolean hasPostCompressionAdlerChecksums)
+ {
+ this.indexFilePath = filePath;
+ this.parameters = parameters;
+ this.dataLength = dataLength;
+ this.compressedFileLength = compressedLength;
+ this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
+ this.chunkOffsets = offsets;
+ offsets.reference();
+ this.chunkOffsetsSize = offsetsSize;
}
public ICompressor compressor()
@@ -173,7 +202,7 @@ public class CompressionMetadata
// position of the chunk
int idx = 8 * (int) (position / parameters.chunkLength());
- if (idx >= chunkOffsets.size())
+ if (idx >= chunkOffsetsSize)
throw new CorruptSSTableException(new EOFException(), indexFilePath);
long chunkOffset = chunkOffsets.getLong(idx);
@@ -207,7 +236,7 @@ public class CompressionMetadata
{
long offset = i * 8;
long chunkOffset = chunkOffsets.getLong(offset);
- long nextChunkOffset = offset + 8 == chunkOffsets.size()
+ long nextChunkOffset = offset + 8 == chunkOffsetsSize
? compressedFileLength
: chunkOffsets.getLong(offset + 8);
offsets.add(new Chunk(chunkOffset, (int) (nextChunkOffset - chunkOffset - 4))); // "4" bytes reserved for checksum
@@ -218,52 +247,60 @@ public class CompressionMetadata
public void close()
{
- chunkOffsets.free();
+ if (chunkOffsets instanceof RefCountedMemory)
+ ((RefCountedMemory) chunkOffsets).unreference();
+ else
+ chunkOffsets.free();
}
- public static class Writer extends RandomAccessFile
+ public static class Writer
{
- // place for uncompressed data length in the index file
- private long dataLengthOffset = -1;
// path to the file
+ private final CompressionParameters parameters;
private final String filePath;
+ private int maxCount = 100;
+ private RefCountedMemory offsets = new RefCountedMemory(maxCount * 8);
+ private int count = 0;
- private Writer(String path) throws FileNotFoundException
+ private Writer(CompressionParameters parameters, String path)
{
- super(path, "rw");
+ this.parameters = parameters;
filePath = path;
}
- public static Writer open(String path)
+ public static Writer open(CompressionParameters parameters, String path)
{
- try
- {
- return new Writer(path);
- }
- catch (FileNotFoundException e)
+ return new Writer(parameters, path);
+ }
+
+ public void addOffset(long offset)
+ {
+ if (count == maxCount)
{
- throw new RuntimeException(e);
+ RefCountedMemory newOffsets = offsets.copy((maxCount *= 2) * 8);
+ offsets.unreference();
+ offsets = newOffsets;
}
+ offsets.setLong(8 * count++, offset);
}
- public void writeHeader(CompressionParameters parameters)
+ private void writeHeader(DataOutput out, long dataLength, int chunks)
{
try
{
- writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
- writeInt(parameters.otherOptions.size());
+ out.writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
+ out.writeInt(parameters.otherOptions.size());
for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet())
{
- writeUTF(entry.getKey());
- writeUTF(entry.getValue());
+ out.writeUTF(entry.getKey());
+ out.writeUTF(entry.getValue());
}
// store the length of the chunk
- writeInt(parameters.chunkLength());
+ out.writeInt(parameters.chunkLength());
// store position and reserve a place for uncompressed data length and chunks count
- dataLengthOffset = getFilePointer();
- writeLong(-1);
- writeInt(-1);
+ out.writeLong(dataLength);
+ out.writeInt(chunks);
}
catch (IOException e)
{
@@ -271,36 +308,16 @@ public class CompressionMetadata
}
}
- public void finalizeHeader(long dataLength, int chunks)
+ public CompressionMetadata openEarly(long dataLength, long compressedLength)
{
- assert dataLengthOffset != -1 : "writeHeader wasn't called";
-
- long currentPosition;
- try
- {
- currentPosition = getFilePointer();
- }
- catch (IOException e)
- {
- throw new FSReadError(e, filePath);
- }
-
- try
- {
- // seek back to the data length position
- seek(dataLengthOffset);
-
- // write uncompressed data length and chunks count
- writeLong(dataLength);
- writeInt(chunks);
+ return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
+ }
- // seek forward to the previous position
- seek(currentPosition);
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, filePath);
- }
+ public CompressionMetadata openAfterClose(long dataLength, long compressedLength)
+ {
+ RefCountedMemory newOffsets = offsets.copy(count * 8L);
+ offsets.unreference();
+ return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
}
/**
@@ -312,33 +329,7 @@ public class CompressionMetadata
*/
public long chunkOffsetBy(int chunkIndex)
{
- if (dataLengthOffset == -1)
- throw new IllegalStateException("writeHeader wasn't called");
-
- try
- {
- long position = getFilePointer();
-
- // seek to the position of the given chunk
- seek(dataLengthOffset
- + 8 // size reserved for uncompressed data length
- + 4 // size reserved for chunk count
- + (chunkIndex * 8L));
-
- try
- {
- return readLong();
- }
- finally
- {
- // back to the original position
- seek(position);
- }
- }
- catch (IOException e)
- {
- throw new FSReadError(e, filePath);
- }
+ return offsets.getLong(chunkIndex * 8);
}
/**
@@ -347,25 +338,17 @@ public class CompressionMetadata
*/
public void resetAndTruncate(int chunkIndex)
{
- try
- {
- seek(dataLengthOffset
- + 8 // size reserved for uncompressed data length
- + 4 // size reserved for chunk count
- + (chunkIndex * 8L));
- getChannel().truncate(getFilePointer());
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, filePath);
- }
+ count = chunkIndex;
}
- public void close() throws IOException
+ public void close(long dataLength, int chunks) throws IOException
{
- if (getChannel().isOpen()) // if RAF.closed were public we could just use that, but it's not
- getChannel().force(true);
- super.close();
+ final DataOutputStream out = new DataOutputStream(new FileOutputStream(filePath));
+ assert chunks == count;
+ writeHeader(out, dataLength, chunks);
+ for (int i = 0 ; i < count ; i++)
+ out.writeLong(offsets.getLong(i * 8));
+ out.close();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 5d20652..7e7b364 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -83,7 +83,7 @@ public abstract class AbstractSSTableSimpleWriter
int maxGen = 0;
for (Descriptor desc : existing)
maxGen = Math.max(maxGen, desc.generation);
- return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, true).filenameFor(Component.DATA);
+ return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, Descriptor.Type.TEMP).filenameFor(Component.DATA);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 18609bf..b42abf4 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -122,29 +122,41 @@ public class Descriptor
}
}
+ public static enum Type
+ {
+ TEMP("tmp", true), TEMPLINK("tmplink", true), FINAL(null, false);
+ public final boolean isTemporary;
+ public final String marker;
+ Type(String marker, boolean isTemporary)
+ {
+ this.isTemporary = isTemporary;
+ this.marker = marker;
+ }
+ }
+
public final File directory;
/** version has the following format: <code>[a-z]+</code> */
public final Version version;
public final String ksname;
public final String cfname;
public final int generation;
- public final boolean temporary;
+ public final Type type;
private final int hashCode;
/**
* A descriptor that assumes CURRENT_VERSION.
*/
- public Descriptor(File directory, String ksname, String cfname, int generation, boolean temp)
+ public Descriptor(File directory, String ksname, String cfname, int generation, Type temp)
{
this(Version.CURRENT, directory, ksname, cfname, generation, temp);
}
- public Descriptor(String version, File directory, String ksname, String cfname, int generation, boolean temp)
+ public Descriptor(String version, File directory, String ksname, String cfname, int generation, Type temp)
{
this(new Version(version), directory, ksname, cfname, generation, temp);
}
- public Descriptor(Version version, File directory, String ksname, String cfname, int generation, boolean temp)
+ public Descriptor(Version version, File directory, String ksname, String cfname, int generation, Type temp)
{
assert version != null && directory != null && ksname != null && cfname != null;
this.version = version;
@@ -152,13 +164,13 @@ public class Descriptor
this.ksname = ksname;
this.cfname = cfname;
this.generation = generation;
- temporary = temp;
+ type = temp;
hashCode = Objects.hashCode(directory, generation, ksname, cfname, temp);
}
public Descriptor withGeneration(int newGeneration)
{
- return new Descriptor(version, directory, ksname, cfname, newGeneration, temporary);
+ return new Descriptor(version, directory, ksname, cfname, newGeneration, type);
}
public String filenameFor(Component component)
@@ -172,8 +184,8 @@ public class Descriptor
buff.append(directory).append(File.separatorChar);
buff.append(ksname).append(separator);
buff.append(cfname).append(separator);
- if (temporary)
- buff.append(SSTable.TEMPFILE_MARKER).append(separator);
+ if (type.isTemporary)
+ buff.append(type.marker).append(separator);
buff.append(version).append(separator);
buff.append(generation);
return buff.toString();
@@ -231,10 +243,15 @@ public class Descriptor
// optional temporary marker
nexttok = st.nextToken();
- boolean temporary = false;
- if (nexttok.equals(SSTable.TEMPFILE_MARKER))
+ Type type = Type.FINAL;
+ if (nexttok.equals(Type.TEMP.marker))
+ {
+ type = Type.TEMP;
+ nexttok = st.nextToken();
+ }
+ else if (nexttok.equals(Type.TEMPLINK.marker))
{
- temporary = true;
+ type = Type.TEMPLINK;
nexttok = st.nextToken();
}
@@ -250,16 +267,16 @@ public class Descriptor
if (!skipComponent)
component = st.nextToken();
directory = directory != null ? directory : new File(".");
- return Pair.create(new Descriptor(version, directory, ksname, cfname, generation, temporary), component);
+ return Pair.create(new Descriptor(version, directory, ksname, cfname, generation, type), component);
}
/**
- * @param temporary temporary flag
+ * @param type temporary flag
* @return A clone of this descriptor with the given 'temporary' status.
*/
- public Descriptor asTemporary(boolean temporary)
+ public Descriptor asType(Type type)
{
- return new Descriptor(version, directory, ksname, cfname, generation, temporary);
+ return new Descriptor(version, directory, ksname, cfname, generation, type);
}
public IMetadataSerializer getMetadataSerializer()
@@ -296,7 +313,7 @@ public class Descriptor
&& that.generation == this.generation
&& that.ksname.equals(this.ksname)
&& that.cfname.equals(this.cfname)
- && that.temporary == this.temporary;
+ && that.type == this.type;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 0696fb7..f53a7e4 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -25,11 +25,11 @@ import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.cache.RefCountedMemory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.Memory;
import org.apache.cassandra.io.util.MemoryOutputStream;
import org.apache.cassandra.utils.FBUtilities;
@@ -60,7 +60,7 @@ public class IndexSummary implements Closeable
private final IPartitioner partitioner;
private final int summarySize;
private final int sizeAtFullSampling;
- private final Memory bytes;
+ private final RefCountedMemory bytes;
/**
* A value between 1 and BASE_SAMPLING_LEVEL that represents how many of the original
@@ -70,7 +70,7 @@ public class IndexSummary implements Closeable
*/
private final int samplingLevel;
- public IndexSummary(IPartitioner partitioner, Memory memory, int summarySize, int sizeAtFullSampling,
+ public IndexSummary(IPartitioner partitioner, RefCountedMemory memory, int summarySize, int sizeAtFullSampling,
int minIndexInterval, int samplingLevel)
{
this.partitioner = partitioner;
@@ -251,7 +251,7 @@ public class IndexSummary implements Closeable
" the current max index interval (%d)", effectiveIndexInterval, maxIndexInterval));
}
- Memory memory = Memory.allocate(offheapSize);
+ RefCountedMemory memory = new RefCountedMemory(offheapSize);
FBUtilities.copy(in, new MemoryOutputStream(memory), offheapSize);
return new IndexSummary(partitioner, memory, summarySize, fullSamplingSummarySize, minIndexInterval, samplingLevel);
}
@@ -260,6 +260,12 @@ public class IndexSummary implements Closeable
@Override
public void close()
{
- bytes.free();
+ bytes.unreference();
+ }
+
+ public IndexSummary readOnlyClone()
+ {
+ bytes.reference();
+ return this;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index d77e887..8580dce 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,15 +17,17 @@
*/
package org.apache.cassandra.io.sstable;
-import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cache.RefCountedMemory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
@@ -34,7 +36,7 @@ public class IndexSummaryBuilder
private static final Logger logger = LoggerFactory.getLogger(IndexSummaryBuilder.class);
private final ArrayList<Long> positions;
- private final ArrayList<byte[]> keys;
+ private final ArrayList<DecoratedKey> keys;
private final int minIndexInterval;
private final int samplingLevel;
private final int[] startPoints;
@@ -69,6 +71,27 @@ public class IndexSummaryBuilder
keys = new ArrayList<>((int)maxExpectedEntries);
}
+ // finds the last (-offset) decorated key that can be guaranteed to occur fully in the index file before the provided file position
+ public DecoratedKey getMaxReadableKey(long position, int offset)
+ {
+ int i = Collections.binarySearch(positions, position);
+ if (i < 0)
+ {
+ i = -1 - i;
+ if (i == positions.size())
+ i -= 2;
+ else
+ i -= 1;
+ }
+ else
+ i -= 1;
+ i -= offset;
+ // we don't want to return any key if there's only 1 item in the summary, to make sure the sstable range is non-empty
+ if (i <= 0)
+ return null;
+ return keys.get(i);
+ }
+
public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexPosition)
{
if (keysWritten % minIndexInterval == 0)
@@ -86,9 +109,8 @@ public class IndexSummaryBuilder
if (!shouldSkip)
{
- byte[] key = ByteBufferUtil.getArray(decoratedKey.key);
- keys.add(key);
- offheapSize += key.length;
+ keys.add(decoratedKey);
+ offheapSize += decoratedKey.key.remaining();
positions.add(indexPosition);
offheapSize += TypeSizes.NATIVE.sizeof(indexPosition);
}
@@ -102,32 +124,51 @@ public class IndexSummaryBuilder
public IndexSummary build(IPartitioner partitioner)
{
+ return build(partitioner, null);
+ }
+
+ public IndexSummary build(IPartitioner partitioner, DecoratedKey exclusiveUpperBound)
+ {
assert keys.size() > 0;
assert keys.size() == positions.size();
+ int length;
+ if (exclusiveUpperBound == null)
+ length = keys.size();
+ else
+ length = Collections.binarySearch(keys, exclusiveUpperBound);
+
+ assert length > 0;
+
+ long offheapSize = this.offheapSize;
+ if (length < keys.size())
+ for (int i = length ; i < keys.size() ; i++)
+ offheapSize -= keys.get(i).key.remaining() + TypeSizes.NATIVE.sizeof(positions.get(i));
+
// first we write out the position in the *summary* for each key in the summary,
// then we write out (key, actual index position) pairs
- Memory memory = Memory.allocate(offheapSize + (keys.size() * 4));
+ RefCountedMemory memory = new RefCountedMemory(offheapSize + (length * 4));
int idxPosition = 0;
- int keyPosition = keys.size() * 4;
- for (int i = 0; i < keys.size(); i++)
+ int keyPosition = length * 4;
+ for (int i = 0; i < length; i++)
{
// write the position of the actual entry in the index summary (4 bytes)
memory.setInt(idxPosition, keyPosition);
idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
// write the key
- byte[] keyBytes = keys.get(i);
- memory.setBytes(keyPosition, keyBytes, 0, keyBytes.length);
- keyPosition += keyBytes.length;
+ ByteBuffer keyBytes = keys.get(i).key;
+ memory.setBytes(keyPosition, keyBytes);
+ keyPosition += keyBytes.remaining();
// write the position in the actual index file
long actualIndexPosition = positions.get(i);
memory.setLong(keyPosition, actualIndexPosition);
keyPosition += TypeSizes.NATIVE.sizeof(actualIndexPosition);
}
+ assert keyPosition == offheapSize + (length * 4);
int sizeAtFullSampling = (int) Math.ceil(keysWritten / (double) minIndexInterval);
- return new IndexSummary(partitioner, memory, keys.size(), sizeAtFullSampling, minIndexInterval, samplingLevel);
+ return new IndexSummary(partitioner, memory, length, sizeAtFullSampling, minIndexInterval, samplingLevel);
}
public static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
@@ -190,7 +231,7 @@ public class IndexSummaryBuilder
// Subtract (removedKeyCount * 4) from the new size to account for fewer entries in the first section, which
// stores the position of the actual entries in the summary.
- Memory memory = Memory.allocate(newOffHeapSize - (removedKeyCount * 4));
+ RefCountedMemory memory = new RefCountedMemory(newOffHeapSize - (removedKeyCount * 4));
// Copy old entries to our new Memory.
int idxPosition = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 5ee9bdb..247343e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -56,8 +56,6 @@ public abstract class SSTable
{
static final Logger logger = LoggerFactory.getLogger(SSTable.class);
- public static final String TEMPFILE_MARKER = "tmp";
-
public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100;
public final Descriptor descriptor;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index c330c88..7b9d135 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -79,7 +79,7 @@ public class SSTableLoader implements StreamEventHandler
return false;
Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
Descriptor desc = p == null ? null : p.left;
- if (p == null || !p.right.equals(Component.DATA) || desc.temporary)
+ if (p == null || !p.right.equals(Component.DATA) || desc.type.isTemporary)
return false;
if (!new File(desc.filenameFor(Component.PRIMARY_INDEX)).exists())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 8e359bd..c84eec2 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -193,6 +193,7 @@ public class SSTableReader extends SSTable
private SSTableReader replacedBy;
private SSTableReader replaces;
private SSTableDeletingTask deletingTask;
+ private Runnable runOnClose;
@VisibleForTesting
public RestorableMeter readMeter;
@@ -340,7 +341,7 @@ public class SSTableReader extends SSTable
// special implementation of load to use non-pooled SegmentedFile builders
SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
SegmentedFile.Builder dbuilder = sstable.compression
- ? new CompressedSegmentedFile.Builder()
+ ? new CompressedSegmentedFile.Builder(null)
: new BufferedSegmentedFile.Builder();
if (!sstable.loadSummary(ibuilder, dbuilder))
sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
@@ -555,7 +556,7 @@ public class SSTableReader extends SSTable
synchronized (replaceLock)
{
- boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFile = false;
+ boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
if (replacedBy != null)
{
@@ -563,7 +564,7 @@ public class SSTableReader extends SSTable
closeSummary = replacedBy.indexSummary != indexSummary;
closeFiles = replacedBy.dfile != dfile;
// if the replacement sstablereader uses a different path, clean up our paths
- deleteFile = !dfile.path.equals(replacedBy.dfile.path);
+ deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
}
if (replaces != null)
@@ -571,7 +572,7 @@ public class SSTableReader extends SSTable
closeBf &= replaces.bf != bf;
closeSummary &= replaces.indexSummary != indexSummary;
closeFiles &= replaces.dfile != dfile;
- deleteFile &= !dfile.path.equals(replaces.dfile.path);
+ deleteFiles &= !dfile.path.equals(replaces.dfile.path);
}
boolean deleteAll = false;
@@ -597,12 +598,15 @@ public class SSTableReader extends SSTable
replacedBy.replaces = replaces;
}
- scheduleTidy(closeBf, closeSummary, closeFiles, deleteFile, deleteAll);
+ scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
}
}
private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
{
+ if (references.get() != 0)
+ throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
+
final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
final OpOrder.Barrier barrier;
if (cfs != null)
@@ -619,7 +623,6 @@ public class SSTableReader extends SSTable
{
if (barrier != null)
barrier.await();
- assert references.get() == 0;
if (closeBf)
bf.close();
if (closeSummary)
@@ -629,6 +632,8 @@ public class SSTableReader extends SSTable
ifile.cleanup();
dfile.cleanup();
}
+ if (runOnClose != null)
+ runOnClose.run();
if (deleteAll)
{
/**
@@ -650,6 +655,16 @@ public class SSTableReader extends SSTable
});
}
+ public boolean equals(Object that)
+ {
+ return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
+ }
+
+ public int hashCode()
+ {
+ return this.descriptor.hashCode();
+ }
+
public String getFilename()
{
return dfile.path;
@@ -894,6 +909,53 @@ public class SSTableReader extends SSTable
}
}
+ public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
+ {
+ synchronized (replaceLock)
+ {
+ assert replacedBy == null;
+
+ if (newStart.compareTo(this.first) > 0)
+ {
+ if (newStart.compareTo(this.last) > 0)
+ {
+ this.runOnClose = new Runnable()
+ {
+ public void run()
+ {
+ CLibrary.trySkipCache(dfile.path, 0, 0);
+ CLibrary.trySkipCache(ifile.path, 0, 0);
+ runOnClose.run();
+ }
+ };
+ }
+ else
+ {
+ final long dataStart = getPosition(newStart, Operator.GE).position;
+ final long indexStart = getIndexScanPosition(newStart);
+ this.runOnClose = new Runnable()
+ {
+ public void run()
+ {
+ CLibrary.trySkipCache(dfile.path, 0, dataStart);
+ CLibrary.trySkipCache(ifile.path, 0, indexStart);
+ runOnClose.run();
+ }
+ };
+ }
+ }
+
+ if (readMeterSyncFuture != null)
+ readMeterSyncFuture.cancel(false);
+ SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata);
+ replacement.readMeter = this.readMeter;
+ replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
+ replacement.last = this.last;
+ setReplacedBy(replacement);
+ return replacement;
+ }
+ }
+
/**
* Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
* be built at the target samplingLevel. This (original) SSTableReader instance will be marked as replaced, have
@@ -1022,7 +1084,7 @@ public class SSTableReader extends SSTable
return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
}
- public static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
+ private static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
{
if (binarySearchResult == -1)
return -1;
@@ -1245,6 +1307,12 @@ public class SSTableReader extends SSTable
return positions;
}
+ public void invalidateCacheKey(DecoratedKey key)
+ {
+ KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.key);
+ keyCache.remove(cacheKey);
+ }
+
public void cacheKey(DecoratedKey key, RowIndexEntry info)
{
CachingOptions caching = metadata.getCaching();
@@ -1261,29 +1329,6 @@ public class SSTableReader extends SSTable
keyCache.put(cacheKey, info);
}
- public void preheat(Map<DecoratedKey, RowIndexEntry> cachedKeys) throws IOException
- {
- RandomAccessFile f = new RandomAccessFile(getFilename(), "r");
-
- try
- {
- int fd = CLibrary.getfd(f.getFD());
-
- for (Map.Entry<DecoratedKey, RowIndexEntry> entry : cachedKeys.entrySet())
- {
- cacheKey(entry.getKey(), entry.getValue());
-
- // add to the cache but don't do actual preheating if we have it disabled in the config
- if (DatabaseDescriptor.shouldPreheatPageCache() && fd > 0)
- CLibrary.preheatPage(fd, entry.getValue().position);
- }
- }
- finally
- {
- FileUtils.closeQuietly(f);
- }
- }
-
public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
{
return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.key), updateStats);
@@ -1662,6 +1707,20 @@ public class SSTableReader extends SSTable
return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
}
+ public SSTableReader getCurrentReplacement()
+ {
+ synchronized (replaceLock)
+ {
+ SSTableReader cur = this, next = replacedBy;
+ while (next != null)
+ {
+ cur = next;
+ next = next.replacedBy;
+ }
+ return cur;
+ }
+ }
+
/**
* TODO: Move someplace reusable
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
new file mode 100644
index 0000000..2dfefc4
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.utils.CLibrary;
+
+public class SSTableRewriter
+{
+
+ private static final long preemptiveOpenInterval;
+ static
+ {
+ long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
+ if (interval < 0)
+ interval = Long.MAX_VALUE;
+ preemptiveOpenInterval = interval;
+ }
+
+ private final DataTracker dataTracker;
+ private final ColumnFamilyStore cfs;
+
+ private final long maxAge;
+ private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
+ private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting
+ private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting
+
+ private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
+ private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
+
+ private final List<SSTableReader> finished = new ArrayList<>(); // the resultant sstables
+ private final OperationType rewriteType; // the type of rewrite/compaction being performed
+ private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
+
+ private SSTableWriter writer;
+ private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
+
+ public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, OperationType rewriteType, boolean isOffline)
+ {
+ this.rewriting = rewriting;
+ for (SSTableReader sstable : rewriting)
+ {
+ originalStarts.put(sstable.descriptor, sstable.first);
+ fileDescriptors.put(sstable.descriptor, CLibrary.getfd(sstable.getFilename()));
+ }
+ this.dataTracker = cfs.getDataTracker();
+ this.cfs = cfs;
+ this.maxAge = maxAge;
+ this.rewriteType = rewriteType;
+ this.isOffline = isOffline;
+ }
+
+ public SSTableWriter currentWriter()
+ {
+ return writer;
+ }
+
+ public RowIndexEntry append(AbstractCompactedRow row)
+ {
+ // we do this before appending to ensure we can resetAndTruncate() safely if the append fails
+ maybeReopenEarly(row.key);
+ RowIndexEntry index = writer.append(row);
+ if (!isOffline)
+ {
+ if (index == null)
+ {
+ cfs.invalidateCachedRow(row.key);
+ }
+ else
+ {
+ boolean save = false;
+ for (SSTableReader reader : rewriting)
+ {
+ if (reader.getCachedPosition(row.key, false) != null)
+ {
+ save = true;
+ break;
+ }
+ }
+ if (save)
+ cachedKeys.put(row.key, index);
+ }
+ }
+ return index;
+ }
+
+ // attempts to append the row, if fails resets the writer position
+ public RowIndexEntry tryAppend(AbstractCompactedRow row)
+ {
+ mark();
+ try
+ {
+ return append(row);
+ }
+ catch (Throwable t)
+ {
+ resetAndTruncate();
+ throw t;
+ }
+ }
+
+ private void mark()
+ {
+ writer.mark();
+ }
+
+ private void resetAndTruncate()
+ {
+ writer.resetAndTruncate();
+ }
+
+ private void maybeReopenEarly(DecoratedKey key)
+ {
+ if (writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval)
+ {
+ if (isOffline)
+ {
+ for (SSTableReader reader : rewriting)
+ {
+ RowIndexEntry index = reader.getPosition(key, SSTableReader.Operator.GE);
+ CLibrary.trySkipCache(fileDescriptors.get(reader.descriptor), 0, index == null ? 0 : index.position);
+ }
+ }
+ else
+ {
+ SSTableReader reader = writer.openEarly(maxAge);
+ if (reader != null)
+ {
+ replaceReader(currentlyOpenedEarly, reader);
+ currentlyOpenedEarly = reader;
+ currentlyOpenedEarlyAt = writer.getFilePointer();
+ moveStarts(reader, Functions.constant(reader.last), false);
+ }
+ }
+ }
+ }
+
+ public void abort()
+ {
+ if (writer == null)
+ return;
+ moveStarts(null, Functions.forMap(originalStarts), true);
+ List<SSTableReader> close = new ArrayList<>(finished);
+ if (currentlyOpenedEarly != null)
+ close.add(currentlyOpenedEarly);
+ // also remove already completed SSTables
+ for (SSTableReader sstable : close)
+ sstable.markObsolete();
+ // releases reference in replaceReaders
+ if (!isOffline)
+ {
+ dataTracker.replaceReaders(close, Collections.<SSTableReader>emptyList());
+ dataTracker.unmarkCompacting(close);
+ }
+ writer.abort();
+ }
+
+ /**
+ * Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer
+ * needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset
+ * is true, we are instead restoring the starts of the readers from before the rewriting began
+ *
+ * @param newReader the rewritten reader that replaces them for this region
+ * @param newStarts a function mapping a reader's descriptor to their new start value
+ * @param reset true iff we are restoring earlier starts (increasing the range over which they are valid)
+ */
+ private void moveStarts(SSTableReader newReader, Function<? super Descriptor, DecoratedKey> newStarts, boolean reset)
+ {
+ if (isOffline)
+ return;
+ List<SSTableReader> toReplace = new ArrayList<>();
+ List<SSTableReader> replaceWith = new ArrayList<>();
+ final List<DecoratedKey> invalidateKeys = new ArrayList<>();
+ if (!reset)
+ {
+ invalidateKeys.addAll(cachedKeys.keySet());
+ for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
+ newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
+ }
+ cachedKeys = new HashMap<>();
+ for (final SSTableReader sstable : rewriting)
+ {
+ DecoratedKey newStart = newStarts.apply(sstable.descriptor);
+ assert newStart != null;
+ if (sstable.first.compareTo(newStart) < 0 || (reset && newStart != sstable.first))
+ {
+ toReplace.add(sstable);
+ // we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once.
+ // note: only one such writer should be written to at any moment
+ replaceWith.add(sstable.getCurrentReplacement().cloneWithNewStart(newStart, new Runnable()
+ {
+ public void run()
+ {
+ // this is somewhat racey, in that we could theoretically be closing this old reader
+ // when an even older reader is still in use, but it's not likely to have any major impact
+ for (DecoratedKey key : invalidateKeys)
+ sstable.invalidateCacheKey(key);
+ }
+ }));
+ }
+ }
+ replaceReaders(toReplace, replaceWith);
+ rewriting.removeAll(toReplace);
+ rewriting.addAll(replaceWith);
+ }
+
+ private void replaceReader(SSTableReader toReplace, SSTableReader replaceWith)
+ {
+ if (isOffline)
+ return;
+ Set<SSTableReader> toReplaceSet;
+ if (toReplace != null)
+ {
+ toReplace.setReplacedBy(replaceWith);
+ toReplaceSet = Collections.singleton(toReplace);
+ }
+ else
+ {
+ dataTracker.markCompacting(Collections.singleton(replaceWith));
+ toReplaceSet = Collections.emptySet();
+ }
+ replaceReaders(toReplaceSet, Collections.singleton(replaceWith));
+ }
+
+ private void replaceReaders(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
+ {
+ if (isOffline)
+ return;
+ dataTracker.replaceReaders(toReplace, replaceWith);
+ }
+
+ public void switchWriter(SSTableWriter newWriter)
+ {
+ if (writer == null)
+ {
+ writer = newWriter;
+ return;
+ }
+ // tmp = false because later we want to query it with descriptor from SSTableReader
+ SSTableReader reader = writer.closeAndOpenReader(maxAge);
+ finished.add(reader);
+ replaceReader(currentlyOpenedEarly, reader);
+ moveStarts(reader, Functions.constant(reader.last), false);
+ currentlyOpenedEarly = null;
+ currentlyOpenedEarlyAt = 0;
+ writer = newWriter;
+ }
+
+ public void finish()
+ {
+ finish(-1);
+ }
+ public void finish(long repairedAt)
+ {
+ finish(true, repairedAt);
+ }
+ public void finish(boolean cleanupOldReaders)
+ {
+ finish(cleanupOldReaders, -1);
+ }
+ public void finish(boolean cleanupOldReaders, long repairedAt)
+ {
+ if (writer.getFilePointer() > 0)
+ {
+ SSTableReader reader = repairedAt < 0 ?
+ writer.closeAndOpenReader(maxAge) :
+ writer.closeAndOpenReader(maxAge, repairedAt);
+ finished.add(reader);
+ replaceReader(currentlyOpenedEarly, reader);
+ moveStarts(reader, Functions.constant(reader.last), false);
+ }
+ else
+ {
+ writer.abort();
+ writer = null;
+ }
+
+ if (!isOffline)
+ {
+ dataTracker.unmarkCompacting(finished);
+ if (cleanupOldReaders)
+ dataTracker.markCompactedSSTablesReplaced(rewriting, finished, rewriteType);
+ }
+ else if (cleanupOldReaders)
+ {
+ for (SSTableReader reader : rewriting)
+ {
+ reader.markObsolete();
+ reader.releaseReference();
+ }
+ }
+ }
+
+ public List<SSTableReader> finished()
+ {
+ return finished;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 4a7729e..1c9c5fd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -17,26 +17,51 @@
*/
package org.apache.cassandra.io.sstable;
-import java.io.*;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import com.google.common.collect.Sets;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnIndex;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.CounterCell;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FilterFactory;
@@ -110,17 +135,16 @@ public class SSTableWriter extends SSTable
if (compression)
{
- dbuilder = SegmentedFile.getCompressedBuilder();
dataFile = SequentialWriter.open(getFilename(),
descriptor.filenameFor(Component.COMPRESSION_INFO),
- !metadata.populateIoCacheOnFlush(),
metadata.compressionParameters(),
sstableMetadataCollector);
+ dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
}
else
{
+ dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
- dataFile = SequentialWriter.open(new File(getFilename()), !metadata.populateIoCacheOnFlush(), new File(descriptor.filenameFor(Component.CRC)));
}
this.sstableMetadataCollector = sstableMetadataCollector;
@@ -299,9 +323,16 @@ public class SSTableWriter extends SSTable
*/
public void abort()
{
- assert descriptor.temporary;
- FileUtils.closeQuietly(iwriter);
- FileUtils.closeQuietly(dataFile);
+ assert descriptor.type.isTemporary;
+ if (iwriter == null && dataFile == null)
+ return;
+ if (iwriter != null)
+ {
+ FileUtils.closeQuietly(iwriter.indexFile);
+ iwriter.bf.close();
+ }
+ if (dataFile!= null)
+ FileUtils.closeQuietly(dataFile);
Set<Component> components = SSTable.componentsFor(descriptor);
try
@@ -326,6 +357,54 @@ public class SSTableWriter extends SSTable
last = lastWrittenKey = getMinimalKey(last);
}
+ public SSTableReader openEarly(long maxDataAge)
+ {
+ StatsMetadata sstableMetadata = (StatsMetadata) sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ metadata.getBloomFilterFpChance(),
+ repairedAt).get(MetadataType.STATS);
+
+ // find the max (exclusive) readable key
+ DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
+ if (exclusiveUpperBoundOfReadableIndex == null)
+ return null;
+
+ // create temp links if they don't already exist
+ Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
+ if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
+ {
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
+ }
+
+ // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
+ SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
+ SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA));
+ SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
+ components, metadata,
+ partitioner, ifile,
+ dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
+ iwriter.bf, maxDataAge, sstableMetadata);
+
+ // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
+ sstable.first = getMinimalKey(first);
+ sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
+ DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
+ if (inclusiveUpperBoundOfReadableData == null)
+ return null;
+ int offset = 2;
+ while (true)
+ {
+ RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
+ if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
+ break;
+ inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
+ if (inclusiveUpperBoundOfReadableData == null)
+ return null;
+ }
+ sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
+ return sstable;
+ }
+
public SSTableReader closeAndOpenReader()
{
return closeAndOpenReader(System.currentTimeMillis());
@@ -395,7 +474,7 @@ public class SSTableWriter extends SSTable
private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
{
- SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)), true);
+ SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
try
{
desc.getMetadataSerializer().serialize(components, out.stream);
@@ -412,7 +491,7 @@ public class SSTableWriter extends SSTable
static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
{
- Descriptor newdesc = tmpdesc.asTemporary(false);
+ Descriptor newdesc = tmpdesc.asType(Descriptor.Type.FINAL);
rename(tmpdesc, newdesc, components);
return newdesc;
}
@@ -454,13 +533,19 @@ public class SSTableWriter extends SSTable
IndexWriter(long keyCount)
{
- indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
- !metadata.populateIoCacheOnFlush());
+ indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
}
+ // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
+ DecoratedKey getMaxReadableKey(int offset)
+ {
+ long maxIndexLength = indexFile.getLastFlushOffset();
+ return summary.getMaxReadableKey(maxIndexLength, offset);
+ }
+
public void append(DecoratedKey key, RowIndexEntry indexEntry)
{
bf.add(key.key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 84789a6..7ba2895 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -19,15 +19,25 @@ package org.apache.cassandra.io.sstable.metadata;
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.ColumnNameHelper;
+import org.apache.cassandra.io.sstable.ColumnStats;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.MurmurHash;
@@ -246,8 +256,8 @@ public class MetadataCollector
compressionRatio,
estimatedTombstoneDropTime,
sstableLevel,
- minColumnNames,
- maxColumnNames,
+ ImmutableList.copyOf(minColumnNames),
+ ImmutableList.copyOf(maxColumnNames),
hasLegacyCounterShards,
repairedAt));
components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index c0cd96e..7414208 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -146,7 +146,7 @@ public class MetadataSerializer implements IMetadataSerializer
private void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException
{
- Descriptor tmpDescriptor = descriptor.asTemporary(true);
+ Descriptor tmpDescriptor = descriptor.asType(Descriptor.Type.TEMP);
try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index 6a23fde..b284f61 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -38,6 +38,11 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
long length = new File(path).length();
return new BufferedPoolingSegmentedFile(path, length);
}
+
+ public SegmentedFile openEarly(String path)
+ {
+ return complete(path);
+ }
}
protected RandomAccessReader createReader(String path)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index 790b42b..aa031e3 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -38,6 +38,11 @@ public class BufferedSegmentedFile extends SegmentedFile
long length = new File(path).length();
return new BufferedSegmentedFile(path, length);
}
+
+ public SegmentedFile openEarly(String path)
+ {
+ return complete(path);
+ }
}
public FileDataInput getSegment(long position)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index 3c4c257..98492da 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -9,10 +9,10 @@ public class ChecksummedSequentialWriter extends SequentialWriter
private final SequentialWriter crcWriter;
private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
- public ChecksummedSequentialWriter(File file, int bufferSize, boolean skipIOCache, File crcPath)
+ public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
{
- super(file, bufferSize, skipIOCache);
- crcWriter = new SequentialWriter(crcPath, 8 * 1024, true);
+ super(file, bufferSize);
+ crcWriter = new SequentialWriter(crcPath, 8 * 1024);
crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
crcMetadata.writeChunkSize(buffer.length);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 121bdb2..1803e69 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.io.util;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionMetadata;
public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
@@ -30,8 +31,13 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
this.metadata = metadata;
}
- public static class Builder extends SegmentedFile.Builder
+ public static class Builder extends CompressedSegmentedFile.Builder
{
+ public Builder(CompressedSequentialWriter writer)
+ {
+ super(writer);
+ }
+
public void addPotentialBoundary(long boundary)
{
// only one segment in a standard-io file
@@ -39,7 +45,12 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
public SegmentedFile complete(String path)
{
- return new CompressedPoolingSegmentedFile(path, CompressionMetadata.create(path));
+ return new CompressedPoolingSegmentedFile(path, metadata(path, false));
+ }
+
+ public SegmentedFile openEarly(String path)
+ {
+ return new CompressedPoolingSegmentedFile(path, metadata(path, true));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index d0ea3fd..4afe0a0 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.io.util;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionMetadata;
public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
@@ -32,14 +33,35 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
public static class Builder extends SegmentedFile.Builder
{
+ protected final CompressedSequentialWriter writer;
+ public Builder(CompressedSequentialWriter writer)
+ {
+ this.writer = writer;
+ }
+
public void addPotentialBoundary(long boundary)
{
// only one segment in a standard-io file
}
+ protected CompressionMetadata metadata(String path, boolean early)
+ {
+ if (writer == null)
+ return CompressionMetadata.create(path);
+ else if (early)
+ return writer.openEarly();
+ else
+ return writer.openAfterClose();
+ }
+
public SegmentedFile complete(String path)
{
- return new CompressedSegmentedFile(path, CompressionMetadata.create(path));
+ return new CompressedSegmentedFile(path, metadata(path, false));
+ }
+
+ public SegmentedFile openEarly(String path)
+ {
+ return new CompressedSegmentedFile(path, metadata(path, true));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 15d890c..875c9d5 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -74,6 +74,11 @@ public class FileUtils
canCleanDirectBuffers = canClean;
}
+ public static void createHardLink(String from, String to)
+ {
+ createHardLink(new File(from), new File(to));
+ }
+
public static void createHardLink(File from, File to)
{
if (to.exists())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index 278e0f6..b8a46bc 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -21,9 +21,9 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import com.sun.jna.Native;
-import com.sun.jna.Pointer;
import org.apache.cassandra.config.DatabaseDescriptor;
import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
/**
* An off-heap region of memory that must be manually free'd when no longer needed.
@@ -145,6 +145,24 @@ public class Memory
}
}
+ public void setBytes(long memoryOffset, ByteBuffer buffer)
+ {
+ if (buffer == null)
+ throw new NullPointerException();
+ else if (buffer.remaining() == 0)
+ return;
+ checkPosition(memoryOffset + buffer.remaining());
+ if (buffer.hasArray())
+ {
+ setBytes(memoryOffset, buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+ }
+ else if (buffer instanceof DirectBuffer)
+ {
+ unsafe.copyMemory(((DirectBuffer) buffer).address() + buffer.position(), peer + memoryOffset, buffer.remaining());
+ }
+ else
+ throw new IllegalStateException();
+ }
/**
* Transfers count bytes from buffer to Memory
*
@@ -263,6 +281,18 @@ public class Memory
assert offset >= 0 && offset < size : "Illegal offset: " + offset + ", size: " + size;
}
+ public void put(long trgOffset, Memory memory, long srcOffset, long size)
+ {
+ unsafe.copyMemory(memory.peer + srcOffset, peer + trgOffset, size);
+ }
+
+ public Memory copy(long newSize)
+ {
+ Memory copy = Memory.allocate(newSize);
+ copy.put(0, this, 0, Math.min(size(), newSize));
+ return copy;
+ }
+
public void free()
{
assert peer != 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 39a4160..450553b 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -161,28 +161,36 @@ public class MmappedSegmentedFile extends SegmentedFile
public SegmentedFile complete(String path)
{
long length = new File(path).length();
- // add a sentinel value == length
- if (length != boundaries.get(boundaries.size() - 1))
- boundaries.add(length);
// create the segments
return new MmappedSegmentedFile(path, length, createSegments(path));
}
+ public SegmentedFile openEarly(String path)
+ {
+ return complete(path);
+ }
+
private Segment[] createSegments(String path)
{
- int segcount = boundaries.size() - 1;
- Segment[] segments = new Segment[segcount];
RandomAccessFile raf;
-
+ long length;
try
{
raf = new RandomAccessFile(path, "r");
+ length = raf.length();
}
- catch (FileNotFoundException e)
+ catch (IOException e)
{
throw new RuntimeException(e);
}
+ // add a sentinel value == length
+ List<Long> boundaries = new ArrayList<>(this.boundaries);
+ if (length != boundaries.get(boundaries.size() - 1))
+ boundaries.add(length);
+ int segcount = boundaries.size() - 1;
+ Segment[] segments = new Segment[segcount];
+
try
{
for (int i = 0; i < segcount; i++)
@@ -221,7 +229,7 @@ public class MmappedSegmentedFile extends SegmentedFile
super.deserializeBounds(in);
int size = in.readInt();
- List<Long> temp = new ArrayList<Long>(size);
+ List<Long> temp = new ArrayList<>(size);
for (int i = 0; i < size; i++)
temp.add(in.readLong());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index 892611c..01f4e31 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -21,6 +21,7 @@ import org.apache.cassandra.service.FileCacheService;
public abstract class PoolingSegmentedFile extends SegmentedFile
{
+ final FileCacheService.CacheKey cacheKey = new FileCacheService.CacheKey();
protected PoolingSegmentedFile(String path, long length)
{
super(path, length);
@@ -33,7 +34,7 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
public FileDataInput getSegment(long position)
{
- RandomAccessReader reader = FileCacheService.instance.get(path);
+ RandomAccessReader reader = FileCacheService.instance.get(cacheKey);
if (reader == null)
reader = createReader(path);
@@ -46,11 +47,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
public void recycle(RandomAccessReader reader)
{
- FileCacheService.instance.put(reader);
+ FileCacheService.instance.put(cacheKey, reader);
}
public void cleanup()
{
- FileCacheService.instance.invalidate(path);
+ FileCacheService.instance.invalidate(cacheKey, path);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index d4da177..be549a6 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -28,6 +28,7 @@ import java.util.NoSuchElementException;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.utils.Pair;
/**
@@ -75,7 +76,12 @@ public abstract class SegmentedFile
public static Builder getCompressedBuilder()
{
- return new CompressedPoolingSegmentedFile.Builder();
+ return getCompressedBuilder(null);
+ }
+
+ public static Builder getCompressedBuilder(CompressedSequentialWriter writer)
+ {
+ return new CompressedPoolingSegmentedFile.Builder(writer);
}
public abstract FileDataInput getSegment(long position);
@@ -111,6 +117,12 @@ public abstract class SegmentedFile
*/
public abstract SegmentedFile complete(String path);
+ /**
+ * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
+ * @param path The file on disk.
+ */
+ public abstract SegmentedFile openEarly(String path);
+
public void serializeBounds(DataOutput out) throws IOException
{
out.writeUTF(DatabaseDescriptor.getDiskAccessMode().name());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 1d8b95e..7a7eb63 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -45,7 +45,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
private final String filePath;
protected byte[] buffer;
- private final boolean skipIOCache;
private final int fd;
private final int directoryFD;
// directory should be synced only after first file sync, in other words, only once per file
@@ -56,9 +55,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
protected final RandomAccessFile out;
- // used if skip I/O cache was enabled
- private long ioCacheStartOffset = 0, bytesSinceCacheFlush = 0;
-
// whether to do trickling fsync() to avoid sudden bursts of dirty buffer flushing by kernel causing read
// latency spikes
private boolean trickleFsync;
@@ -66,8 +62,9 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
private int bytesSinceTrickleFsync = 0;
public final DataOutputPlus stream;
+ protected long lastFlushOffset;
- public SequentialWriter(File file, int bufferSize, boolean skipIOCache)
+ public SequentialWriter(File file, int bufferSize)
{
try
{
@@ -81,7 +78,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
filePath = file.getAbsolutePath();
buffer = new byte[bufferSize];
- this.skipIOCache = skipIOCache;
this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
@@ -100,31 +96,25 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
public static SequentialWriter open(File file)
{
- return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, false);
- }
-
- public static SequentialWriter open(File file, boolean skipIOCache)
- {
- return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache);
+ return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE);
}
- public static SequentialWriter open(File file, int bufferSize, boolean skipIOCache)
+ public static SequentialWriter open(File file, int bufferSize)
{
- return new SequentialWriter(file, bufferSize, skipIOCache);
+ return new SequentialWriter(file, bufferSize);
}
- public static ChecksummedSequentialWriter open(File file, boolean skipIOCache, File crcPath)
+ public static ChecksummedSequentialWriter open(File file, File crcPath)
{
- return new ChecksummedSequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache, crcPath);
+ return new ChecksummedSequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, crcPath);
}
public static CompressedSequentialWriter open(String dataFilePath,
String offsetsPath,
- boolean skipIOCache,
CompressionParameters parameters,
MetadataCollector sstableMetadataCollector)
{
- return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, skipIOCache, parameters, sstableMetadataCollector);
+ return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
}
public void write(int value) throws ClosedChannelException
@@ -302,23 +292,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
}
}
- if (skipIOCache)
- {
- // we don't know when the data reaches disk since we aren't
- // calling flush
- // so we continue to clear pages we don't need from the first
- // offset we see
- // periodically we update this starting offset
- bytesSinceCacheFlush += validBufferBytes;
-
- if (bytesSinceCacheFlush >= RandomAccessReader.CACHE_FLUSH_INTERVAL_IN_BYTES)
- {
- CLibrary.trySkipCache(this.fd, ioCacheStartOffset, 0);
- ioCacheStartOffset = bufferOffset;
- bytesSinceCacheFlush = 0;
- }
- }
-
// Remember that we wrote, so we don't write it again on next flush().
resetBuffer();
@@ -335,6 +308,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
try
{
out.write(buffer, 0, validBufferBytes);
+ lastFlushOffset += validBufferBytes;
}
catch (IOException e)
{
@@ -431,6 +405,11 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
resetBuffer();
}
+ public long getLastFlushOffset()
+ {
+ return lastFlushOffset;
+ }
+
public void truncate(long toSize)
{
try
@@ -458,9 +437,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
buffer = null;
- if (skipIOCache && bytesSinceCacheFlush > 0)
- CLibrary.trySkipCache(fd, 0, 0);
-
try
{
out.close();