You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/05/07 21:22:20 UTC
[1/2] Single-pass compaction patch by jbellis;
reviewed by marcuse for CASSANDRA-4180
Updated Branches:
refs/heads/trunk 3ca160ea7 -> a9bd531bc
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 8defb10..43ea5a8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -40,6 +40,9 @@ public class SSTableWriter extends SSTable
{
private static final Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
+ // not very random, but the only value that can't be mistaken for a legal column-name length
+ public static final int END_OF_ROW = 0x0000;
+
private IndexWriter iwriter;
private SegmentedFile.Builder dbuilder;
private final SequentialWriter dataFile;
@@ -135,7 +138,7 @@ public class SSTableWriter extends SSTable
return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
}
- private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long dataPosition, DeletionInfo delInfo, ColumnIndex index)
+ private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index)
{
lastWrittenKey = decoratedKey;
last = lastWrittenKey;
@@ -144,30 +147,31 @@ public class SSTableWriter extends SSTable
if (logger.isTraceEnabled())
logger.trace("wrote " + decoratedKey + " at " + dataPosition);
- // range tombstones are part of the Atoms we write as the row contents, so RIE only gets row-level tombstones
- RowIndexEntry entry = RowIndexEntry.create(dataPosition, delInfo.getTopLevelDeletion(), index);
- iwriter.append(decoratedKey, entry);
+ iwriter.append(decoratedKey, index);
dbuilder.addPotentialBoundary(dataPosition);
- return entry;
}
+ /**
+ * @param row
+ * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
+ */
public RowIndexEntry append(AbstractCompactedRow row)
{
long currentPosition = beforeAppend(row.key);
+ RowIndexEntry entry;
try
{
- ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream);
- long dataStart = dataFile.getFilePointer();
- long dataSize = row.write(dataFile.stream);
- assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
- : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8));
+ entry = row.write(currentPosition, dataFile.stream);
+ if (entry == null)
+ return null;
}
catch (IOException e)
{
throw new FSWriteError(e, dataFile.getPath());
}
sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
- return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index());
+ afterAppend(row.key, currentPosition, entry);
+ return entry;
}
public void append(DecoratedKey decoratedKey, ColumnFamily cf)
@@ -175,25 +179,8 @@ public class SSTableWriter extends SSTable
long startPosition = beforeAppend(decoratedKey);
try
{
- ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream);
-
- // Since the columnIndex may insert RangeTombstone marker, computing
- // the size of the data is tricky.
- DataOutputBuffer buffer = new DataOutputBuffer();
-
- // build column index && write columns
- ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, decoratedKey.key, buffer);
- ColumnIndex index = builder.build(cf);
-
- TypeSizes typeSizes = TypeSizes.NATIVE;
- long delSize = DeletionTime.serializer.serializedSize(cf.deletionInfo().getTopLevelDeletion(), typeSizes);
- dataFile.stream.writeLong(buffer.getLength() + delSize + typeSizes.sizeof(0));
-
- // Write deletion infos + column count
- DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), dataFile.stream);
- dataFile.stream.writeInt(builder.writtenAtomCount());
- dataFile.stream.write(buffer.getData(), 0, buffer.getLength());
- afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index);
+ RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
+ afterAppend(decoratedKey, startPosition, entry);
}
catch (IOException e)
{
@@ -202,38 +189,26 @@ public class SSTableWriter extends SSTable
sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
}
+ public static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutput out) throws IOException
+ {
+ assert cf.getColumnCount() > 0 || cf.isMarkedForDelete();
+
+ ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.key, out);
+ ColumnIndex index = builder.build(cf);
+
+ out.writeShort(END_OF_ROW);
+ return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
+ }
+
/**
* @throws IOException if a read from the DataInput fails
* @throws FSWriteError if a write to the dataFile fails
*/
- public long appendFromStream(DecoratedKey key, CFMetaData metadata, long dataSize, DataInput in) throws IOException
+ public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in) throws IOException
{
long currentPosition = beforeAppend(key);
- long dataStart;
- try
- {
- ByteBufferUtil.writeWithShortLength(key.key, dataFile.stream);
- dataStart = dataFile.getFilePointer();
- // write row size
- dataFile.stream.writeLong(dataSize);
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, dataFile.getPath());
- }
DeletionInfo deletionInfo = DeletionInfo.serializer().deserializeFromSSTable(in, descriptor.version);
- int columnCount = in.readInt();
-
- try
- {
- DeletionInfo.serializer().serializeForSSTable(deletionInfo, dataFile.stream);
- dataFile.stream.writeInt(columnCount);
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, dataFile.getPath());
- }
// deserialize each column to obtain maxTimestamp and immediately serialize it.
long minTimestamp = Long.MAX_VALUE;
@@ -245,42 +220,45 @@ public class SSTableWriter extends SSTable
ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, dataFile.stream, true);
OnDiskAtom.Serializer atomSerializer = Column.onDiskSerializer();
- for (int i = 0; i < columnCount; i++)
+ try
{
- // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the
- // data size received, so we must reserialize the exact same data
- OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, Descriptor.Version.CURRENT);
- if (atom instanceof CounterColumn)
- atom = ((CounterColumn) atom).markDeltaToBeCleared();
-
- int deletionTime = atom.getLocalDeletionTime();
- if (deletionTime < Integer.MAX_VALUE)
+ while (true)
{
- tombstones.update(deletionTime);
- }
- minTimestamp = Math.min(minTimestamp, atom.minTimestamp());
- maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp());
- maxLocalDeletionTime = Math.max(maxLocalDeletionTime, atom.getLocalDeletionTime());
+ // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the
+ // data size received, so we must reserialize the exact same data
+ OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, Descriptor.Version.CURRENT);
+ if (atom == null)
+ break;
+ if (atom instanceof CounterColumn)
+ atom = ((CounterColumn) atom).markDeltaToBeCleared();
+
+ int deletionTime = atom.getLocalDeletionTime();
+ if (deletionTime < Integer.MAX_VALUE)
+ {
+ tombstones.update(deletionTime);
+ }
+ minTimestamp = Math.min(minTimestamp, atom.minTimestamp());
+ maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp());
+ maxLocalDeletionTime = Math.max(maxLocalDeletionTime, atom.getLocalDeletionTime());
- try
- {
columnIndexer.add(atom); // This write the atom on disk too
}
- catch (IOException e)
- {
- throw new FSWriteError(e, dataFile.getPath());
- }
+
+ columnIndexer.finish();
+ dataFile.stream.writeShort(END_OF_ROW);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
}
- assert dataSize == dataFile.getFilePointer() - (dataStart + 8)
- : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8));
sstableMetadataCollector.updateMinTimestamp(minTimestamp);
sstableMetadataCollector.updateMaxTimestamp(maxTimestamp);
sstableMetadataCollector.updateMaxLocalDeletionTime(maxLocalDeletionTime);
sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition);
- sstableMetadataCollector.addColumnCount(columnCount);
+ sstableMetadataCollector.addColumnCount(columnIndexer.writtenAtomCount());
sstableMetadataCollector.mergeTombstoneHistogram(tombstones);
- afterAppend(key, currentPosition, deletionInfo, columnIndexer.build());
+ afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, deletionInfo.getTopLevelDeletion(), columnIndexer.build()));
return currentPosition;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index e1aaa56..92f5c7f 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -27,15 +27,14 @@ import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.ning.compress.lzf.LZFInputStream;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ColumnSerializer;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.PrecompactedRow;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.service.StorageService;
@@ -43,7 +42,6 @@ import org.apache.cassandra.streaming.compress.CompressedInputStream;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.Pair;
-import com.ning.compress.lzf.LZFInputStream;
public class IncomingStreamReader
{
@@ -157,28 +155,11 @@ public class IncomingStreamReader
while (bytesRead < length)
{
in.reset(0);
+
key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
- long dataSize = in.readLong();
-
- if (cfs.containsCachedRow(key) && remoteFile.type == OperationType.AES && dataSize <= DatabaseDescriptor.getInMemoryCompactionLimit())
- {
- // need to update row cache
- // Note: Because we won't just echo the columns, there is no need to use the PRESERVE_SIZE flag, contrarily to what appendFromStream does below
- SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, localFile.getFilename(), key, 0, dataSize, ColumnSerializer.Flag.FROM_REMOTE);
- PrecompactedRow row = new PrecompactedRow(controller, Collections.singletonList(iter));
- // We don't expire anything so the row shouldn't be empty
- assert !row.isEmpty();
- writer.append(row);
-
- // update cache
- ColumnFamily cf = row.getFullColumnFamily();
- cfs.maybeUpdateRowCache(key, cf);
- }
- else
- {
- writer.appendFromStream(key, cfs.metadata, dataSize, in);
- cfs.invalidateCachedRow(key);
- }
+ writer.appendFromStream(key, cfs.metadata, in);
+
+ cfs.invalidateCachedRow(key);
bytesRead += in.getBytesRead();
// when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 9e8bb3e..e3b02b1 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
@@ -103,15 +104,15 @@ public class SSTableExport
* </ul>
*
* @param out The output steam to write data
- * @param cf to which the metadata belongs
+ * @param deletionInfo
*/
- private static void writeMeta(PrintStream out, ColumnFamily cf)
+ private static void writeMeta(PrintStream out, DeletionInfo deletionInfo)
{
- if (!cf.deletionInfo().equals(DeletionInfo.LIVE))
+ if (!deletionInfo.equals(DeletionInfo.LIVE))
{
// begin meta
writeKey(out, "metadata");
- writeDeletionInfo(out, cf.deletionInfo().getTopLevelDeletion());
+ writeDeletionInfo(out, deletionInfo.getTopLevelDeletion());
out.print(",");
}
}
@@ -130,22 +131,22 @@ public class SSTableExport
*
* @param atoms column iterator
* @param out output stream
- * @param comparator columns comparator
* @param cfMetaData Column Family metadata (to get validator)
*/
- private static void serializeAtoms(Iterator<OnDiskAtom> atoms, PrintStream out, AbstractType<?> comparator, CFMetaData cfMetaData)
+ private static void serializeAtoms(Iterator<OnDiskAtom> atoms, PrintStream out, CFMetaData cfMetaData)
{
while (atoms.hasNext())
{
- writeJSON(out, serializeAtom(atoms.next(), comparator, cfMetaData));
+ writeJSON(out, serializeAtom(atoms.next(), cfMetaData));
if (atoms.hasNext())
out.print(", ");
}
}
- private static List<Object> serializeAtom(OnDiskAtom atom, AbstractType<?> comparator, CFMetaData cfMetaData)
+ private static List<Object> serializeAtom(OnDiskAtom atom, CFMetaData cfMetaData)
{
+ AbstractType<?> comparator = cfMetaData.comparator;
if (atom instanceof Column)
{
return serializeColumn((Column) atom, comparator, cfMetaData);
@@ -219,21 +220,22 @@ public class SSTableExport
*/
private static void serializeRow(SSTableIdentityIterator row, DecoratedKey key, PrintStream out)
{
- ColumnFamily columnFamily = row.getColumnFamily();
- CFMetaData cfMetaData = columnFamily.metadata();
- AbstractType<?> comparator = columnFamily.getComparator();
+ serializeRow(row.getColumnFamily().deletionInfo(), row, row.getColumnFamily().metadata(), key, out);
+ }
+ private static void serializeRow(DeletionInfo deletionInfo, Iterator<OnDiskAtom> atoms, CFMetaData metadata, DecoratedKey key, PrintStream out)
+ {
out.print("{");
writeKey(out, "key");
writeJSON(out, bytesToHex(key.key));
out.print(",");
- writeMeta(out, columnFamily);
+ writeMeta(out, deletionInfo);
writeKey(out, "columns");
out.print("[");
- serializeAtoms(row, out, comparator, cfMetaData);
+ serializeAtoms(atoms, out, metadata);
out.print("]");
out.print("}");
@@ -277,10 +279,10 @@ public class SSTableExport
*/
public static void export(Descriptor desc, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException
{
- SSTableReader reader = SSTableReader.open(desc);
- SSTableScanner scanner = reader.getScanner();
+ SSTableReader sstable = SSTableReader.open(desc);
+ RandomAccessReader dfile = sstable.openDataReader();
- IPartitioner<?> partitioner = reader.partitioner;
+ IPartitioner<?> partitioner = sstable.partitioner;
if (excludes != null)
toExport.removeAll(Arrays.asList(excludes));
@@ -301,16 +303,20 @@ public class SSTableExport
lastKey = decoratedKey;
- scanner.seekTo(decoratedKey);
-
- if (!scanner.hasNext())
+ RowIndexEntry entry = sstable.getPosition(decoratedKey, SSTableReader.Operator.EQ);
+ if (entry == null)
continue;
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
- if (!row.getKey().equals(decoratedKey))
- continue;
+ dfile.seek(entry.position);
+ ByteBufferUtil.readWithShortLength(dfile); // row key
+ if (sstable.descriptor.version.hasRowSizeAndColumnCount)
+ dfile.readLong(); // row size
+ DeletionInfo deletionInfo = DeletionInfo.serializer().deserializeFromSSTable(dfile, sstable.descriptor.version);
+ int columnCount = sstable.descriptor.version.hasRowSizeAndColumnCount ? dfile.readInt() : Integer.MAX_VALUE;
+
+ Iterator<OnDiskAtom> atomIterator = sstable.metadata.getOnDiskIterator(dfile, columnCount, sstable.descriptor.version);
- serializeRow(row, decoratedKey, outs);
+ serializeRow(deletionInfo, atomIterator, sstable.metadata, decoratedKey, outs);
if (i != 0)
outs.println(",");
@@ -321,8 +327,6 @@ public class SSTableExport
outs.println("\n]");
outs.flush();
-
- scanner.close();
}
// This is necessary to accommodate the test suite since you cannot open a Reader more
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 338abb9..015ece0 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -52,12 +52,12 @@ public class SchemaLoader
private static Logger logger = LoggerFactory.getLogger(SchemaLoader.class);
@BeforeClass
- public static void loadSchema() throws IOException
+ public static void loadSchema() throws IOException, ConfigurationException
{
loadSchema(false);
}
- public static void loadSchema(boolean withOldCfIds) throws IOException
+ public static void loadSchema(boolean withOldCfIds) throws IOException, ConfigurationException
{
// Cleanup first
cleanupAndLeaveDirs();
@@ -72,18 +72,13 @@ public class SchemaLoader
}
});
-
- // Migrations aren't happy if gossiper is not started
+ // Migrations aren't happy if gossiper is not started. Even if we don't use migrations though,
+ // some tests now expect us to start gossip for them.
startGossiper();
- try
- {
- for (KSMetaData ksm : schemaDefinition(withOldCfIds))
- MigrationManager.announceNewKeyspace(ksm);
- }
- catch (ConfigurationException e)
- {
- throw new RuntimeException(e);
- }
+ // if you're messing with low-level sstable stuff, it can be useful to inject the schema directly
+ // Schema.instance.load(schemaDefinition(withOldCfIds));
+ for (KSMetaData ksm : schemaDefinition(withOldCfIds))
+ MigrationManager.announceNewKeyspace(ksm);
}
public static void startGossiper()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index c1e35b4..a14fd99 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -36,7 +36,6 @@ import java.util.concurrent.Future;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.AbstractCompactionTask;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.CompactionTask;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -285,21 +284,4 @@ public class Util
assert thrown : exception.getName() + " not received";
}
-
- public static ByteBuffer serializeForSSTable(ColumnFamily cf)
- {
- try
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(baos);
- DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), out);
- out.writeInt(cf.getColumnCount());
- new ColumnIndex.Builder(cf, ByteBufferUtil.EMPTY_BYTE_BUFFER, out).build(cf);
- return ByteBuffer.wrap(baos.toByteArray());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
deleted file mode 100644
index d1635d8..0000000
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-package org.apache.cassandra.db;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CLibrary;
-
-import static org.apache.cassandra.Util.column;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@RunWith(OrderedJUnit4ClassRunner.class)
-public class ScrubTest extends SchemaLoader
-{
- public String TABLE = "Keyspace1";
- public String CF = "Standard1";
- public String CF3 = "Standard2";
-
- public String copySSTables(String cf) throws IOException
- {
- String root = System.getProperty("corrupt-sstable-root");
- assert root != null;
- File rootDir = new File(root);
- assert rootDir.isDirectory();
-
- File destDir = Directories.create(TABLE, cf).getDirectoryForNewSSTables(1);
-
- String corruptSSTableName = null;
-
- FileUtils.createDirectory(destDir);
- for (File srcFile : rootDir.listFiles())
- {
- if (srcFile.getName().equals(".svn"))
- continue;
- if (!srcFile.getName().contains(cf))
- continue;
- File destFile = new File(destDir, srcFile.getName());
- CLibrary.createHardLink(srcFile, destFile);
-
- assert destFile.exists() : destFile.getAbsoluteFile();
-
- if(destFile.getName().endsWith("Data.db"))
- corruptSSTableName = destFile.getCanonicalPath();
- }
-
- assert corruptSSTableName != null;
- return corruptSSTableName;
- }
-
- @Test
- public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
- {
- CompactionManager.instance.disableAutoCompaction();
- Table table = Table.open(TABLE);
- ColumnFamilyStore cfs = table.getColumnFamilyStore(CF);
-
- List<Row> rows;
-
- // insert data and verify we get it back w/ range query
- fillCF(cfs, 1);
- rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null);
- assertEquals(1, rows.size());
-
- CompactionManager.instance.performScrub(cfs);
-
- // check data is still there
- rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null);
- assertEquals(1, rows.size());
- }
-
- @Test
- public void testScrubDeletedRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
- {
- CompactionManager.instance.disableAutoCompaction();
- Table table = Table.open(TABLE);
- ColumnFamilyStore cfs = table.getColumnFamilyStore(CF3);
-
- ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(TABLE, CF3);
- cf.delete(new DeletionInfo(0, 1)); // expired tombstone
- RowMutation rm = new RowMutation(TABLE, ByteBufferUtil.bytes(1), cf);
- rm.applyUnsafe();
- cfs.forceBlockingFlush();
-
- CompactionManager.instance.performScrub(cfs);
- assert cfs.getSSTables().isEmpty();
- }
-
- @Test
- public void testScrubMultiRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
- {
- CompactionManager.instance.disableAutoCompaction();
- Table table = Table.open(TABLE);
- ColumnFamilyStore cfs = table.getColumnFamilyStore(CF);
-
- List<Row> rows;
-
- // insert data and verify we get it back w/ range query
- fillCF(cfs, 10);
- rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null);
- assertEquals(10, rows.size());
-
- CompactionManager.instance.performScrub(cfs);
-
- // check data is still there
- rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null);
- assertEquals(10, rows.size());
- }
-
- @Test
- public void testScubOutOfOrder() throws Exception
- {
- CompactionManager.instance.disableAutoCompaction();
- Table table = Table.open(TABLE);
- String columnFamily = "Standard3";
- ColumnFamilyStore cfs = table.getColumnFamilyStore(columnFamily);
-
- /*
- * Code used to generate an outOfOrder sstable. The test for out-of-order key in SSTableWriter must also be commented out.
- * The test also assumes an ordered partitioner.
- *
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
- cf.addColumn(new Column(ByteBufferUtil.bytes("someName"), ByteBufferUtil.bytes("someValue"), 0L));
-
- SSTableWriter writer = new SSTableWriter(cfs.getTempSSTablePath(new File(System.getProperty("corrupt-sstable-root"))),
- cfs.metadata.getIndexInterval(),
- cfs.metadata,
- cfs.partitioner,
- SSTableMetadata.createCollector());
- writer.append(Util.dk("a"), cf);
- writer.append(Util.dk("b"), cf);
- writer.append(Util.dk("z"), cf);
- writer.append(Util.dk("c"), cf);
- writer.append(Util.dk("y"), cf);
- writer.append(Util.dk("d"), cf);
- writer.closeAndOpenReader();
- */
-
- copySSTables(columnFamily);
- cfs.loadNewSSTables();
- assert cfs.getSSTables().size() > 0;
-
- List<Row> rows;
- rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null);
- assert !isRowOrdered(rows) : "'corrupt' test file actually was not";
-
- CompactionManager.instance.performScrub(cfs);
- rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null);
- assert isRowOrdered(rows) : "Scrub failed: " + rows;
- assert rows.size() == 6 : "Got " + rows.size();
- }
-
- private static boolean isRowOrdered(List<Row> rows)
- {
- DecoratedKey prev = null;
- for (Row row : rows)
- {
- if (prev != null && prev.compareTo(row.key) > 0)
- return false;
- prev = row.key;
- }
- return true;
- }
-
- protected void fillCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException
- {
- for (int i = 0; i < rowsPerSSTable; i++)
- {
- String key = String.valueOf(i);
- // create a row and update the birthdate value, test that the index query fetches the new version
- ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(TABLE, CF);
- cf.addColumn(column("c1", "1", 1L));
- cf.addColumn(column("c2", "2", 1L));
- RowMutation rm = new RowMutation(TABLE, ByteBufferUtil.bytes(key), cf);
- rm.applyUnsafe();
- }
-
- cfs.forceBlockingFlush();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index f87a0f8..267353f 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.net.CallbackInfo;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
@@ -48,7 +49,7 @@ public class SerializationsTest extends AbstractSerializationsTester
Statics statics = new Statics();
@BeforeClass
- public static void loadSchema() throws IOException
+ public static void loadSchema() throws IOException, ConfigurationException
{
loadSchema(true);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index f066b9b..9df5d25 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -147,8 +147,7 @@ public class CompactionsTest extends SchemaLoader
// check that the shadowed column is gone
SSTableReader sstable = cfs.getSSTables().iterator().next();
- SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Super1", new IdentityQueryFilter()));
- scanner.seekTo(key);
+ SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Super1", new IdentityQueryFilter()), key);
OnDiskAtomIterator iter = scanner.next();
assertEquals(key, iter.getKey());
assert iter.next() instanceof RangeTombstone;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
index f1e689e..cce32df 100644
--- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
+++ b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import com.google.common.base.Objects;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
@@ -55,24 +56,24 @@ public class LazilyCompactedRowTest extends SchemaLoader
// compare eager and lazy compactions
AbstractCompactionIterable eager = new CompactionIterable(OperationType.UNKNOWN,
strategy.getScanners(sstables),
- new PreCompactingController(cfs, sstables, gcBefore, false));
+ new PreCompactingController(cfs, sstables, gcBefore));
AbstractCompactionIterable lazy = new CompactionIterable(OperationType.UNKNOWN,
strategy.getScanners(sstables),
- new LazilyCompactingController(cfs, sstables, gcBefore, false));
- assertBytes(cfs, sstables, eager, lazy);
+ new LazilyCompactingController(cfs, sstables, gcBefore));
+ assertBytes(cfs, eager, lazy);
// compare eager and parallel-lazy compactions
eager = new CompactionIterable(OperationType.UNKNOWN,
strategy.getScanners(sstables),
- new PreCompactingController(cfs, sstables, gcBefore, false));
+ new PreCompactingController(cfs, sstables, gcBefore));
AbstractCompactionIterable parallel = new ParallelCompactionIterable(OperationType.UNKNOWN,
strategy.getScanners(sstables),
new CompactionController(cfs, new HashSet<SSTableReader>(sstables), gcBefore),
0);
- assertBytes(cfs, sstables, eager, parallel);
+ assertBytes(cfs, eager, parallel);
}
- private static void assertBytes(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, AbstractCompactionIterable ci1, AbstractCompactionIterable ci2) throws IOException
+ private static void assertBytes(ColumnFamilyStore cfs, AbstractCompactionIterable ci1, AbstractCompactionIterable ci2) throws IOException
{
CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
@@ -89,8 +90,8 @@ public class LazilyCompactedRowTest extends SchemaLoader
AbstractCompactedRow row2 = iter2.next();
DataOutputBuffer out1 = new DataOutputBuffer();
DataOutputBuffer out2 = new DataOutputBuffer();
- row1.write(out1);
- row2.write(out2);
+ row1.write(-1, out1);
+ row2.write(-1, out2);
File tmpFile1 = File.createTempFile("lcrt1", null);
File tmpFile2 = File.createTempFile("lcrt2", null);
@@ -104,28 +105,23 @@ public class LazilyCompactedRowTest extends SchemaLoader
MappedFileDataInput in1 = new MappedFileDataInput(new FileInputStream(tmpFile1), tmpFile1.getAbsolutePath(), 0, 0);
MappedFileDataInput in2 = new MappedFileDataInput(new FileInputStream(tmpFile2), tmpFile2.getAbsolutePath(), 0, 0);
- // key isn't part of what CompactedRow writes, that's done by SSTW.append
-
- // row size can differ b/c of bloom filter counts being different
- long rowSize1 = in1.readLong();
- long rowSize2 = in2.readLong();
- assertEquals(rowSize1 + 8, out1.getLength());
- assertEquals(rowSize2 + 8, out2.getLength());
+ // row key
+ assertEquals(ByteBufferUtil.readWithShortLength(in1), ByteBufferUtil.readWithShortLength(in2));
// cf metadata
ColumnFamily cf1 = TreeMapBackedSortedColumns.factory.create(cfs.metadata);
ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create(cfs.metadata);
cf1.delete(DeletionInfo.serializer().deserializeFromSSTable(in1, Descriptor.Version.CURRENT));
cf2.delete(DeletionInfo.serializer().deserializeFromSSTable(in2, Descriptor.Version.CURRENT));
- assert cf1.deletionInfo().equals(cf2.deletionInfo());
+ assertEquals(cf1.deletionInfo(), cf2.deletionInfo());
// columns
- int columns = in1.readInt();
- assert columns == in2.readInt();
- for (int i = 0; i < columns; i++)
+ while (true)
{
Column c1 = (Column)Column.onDiskSerializer().deserializeFromSSTable(in1, Descriptor.Version.CURRENT);
Column c2 = (Column)Column.onDiskSerializer().deserializeFromSSTable(in2, Descriptor.Version.CURRENT);
- assert c1.equals(c2) : c1.getString(cfs.metadata.comparator) + " != " + c2.getString(cfs.metadata.comparator);
+ assert Objects.equal(c1, c2) : c1.getString(cfs.metadata.comparator) + " != " + c2.getString(cfs.metadata.comparator);
+ if (c1 == null)
+ break;
}
// that should be everything
assert in1.available() == 0;
@@ -137,8 +133,8 @@ public class LazilyCompactedRowTest extends SchemaLoader
{
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
Collection<SSTableReader> sstables = cfs.getSSTables();
- AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore, false));
- AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore, false));
+ AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore));
+ AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore));
CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
@@ -291,7 +287,7 @@ public class LazilyCompactedRowTest extends SchemaLoader
private static class LazilyCompactingController extends CompactionController
{
- public LazilyCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
+ public LazilyCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore)
{
super(cfs, new HashSet<SSTableReader>(sstables), gcBefore);
}
@@ -305,7 +301,7 @@ public class LazilyCompactedRowTest extends SchemaLoader
private static class PreCompactingController extends CompactionController
{
- public PreCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
+ public PreCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore)
{
super(cfs, new HashSet<SSTableReader>(sstables), gcBefore);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
deleted file mode 100644
index 135f444..0000000
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.cassandra.io.sstable;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.junit.Test;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.Util;
-
-public class SSTableTest extends SchemaLoader
-{
- @Test
- public void testSingleWrite() throws IOException
- {
- // write test data
- ByteBuffer key = ByteBufferUtil.bytes(Integer.toString(1));
- ByteBuffer cbytes = ByteBuffer.wrap(new byte[1024]);
- new Random().nextBytes(cbytes.array());
- ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
- cf.addColumn(new Column(cbytes, cbytes));
-
- SortedMap<DecoratedKey, ColumnFamily> map = new TreeMap<DecoratedKey, ColumnFamily>();
- map.put(Util.dk(key), cf);
- SSTableReader ssTable = SSTableUtils.prepare().cf("Standard1").write(map);
-
- // verify
- ByteBuffer bytes = Util.serializeForSSTable(cf);
- verifySingle(ssTable, bytes, key);
- ssTable = SSTableReader.open(ssTable.descriptor); // read the index from disk
- verifySingle(ssTable, bytes, key);
- }
-
- private void verifySingle(SSTableReader sstable, ByteBuffer bytes, ByteBuffer key) throws IOException
- {
- RandomAccessReader file = sstable.openDataReader();
- file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position);
- assert key.equals(ByteBufferUtil.readWithShortLength(file));
- int size = (int) file.readLong();
- byte[] bytes2 = new byte[size];
- file.readFully(bytes2);
- assert ByteBuffer.wrap(bytes2).equals(bytes);
- }
-
- @Test
- public void testManyWrites() throws IOException
- {
- SortedMap<DecoratedKey, ColumnFamily> map = new TreeMap<DecoratedKey, ColumnFamily>();
- SortedMap<ByteBuffer, ByteBuffer> bytesMap = new TreeMap<ByteBuffer, ByteBuffer>();
- //for (int i = 100; i < 1000; ++i)
- for (int i = 100; i < 300; ++i)
- {
- ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard2");
- ByteBuffer bytes = ByteBufferUtil.bytes(("Avinash Lakshman is a good man: " + i));
- cf.addColumn(new Column(bytes, bytes));
- map.put(Util.dk(Integer.toString(i)), cf);
- bytesMap.put(ByteBufferUtil.bytes(Integer.toString(i)), Util.serializeForSSTable(cf));
- }
-
- // write
- SSTableReader ssTable = SSTableUtils.prepare().cf("Standard2").write(map);
-
- // verify
- verifyMany(ssTable, bytesMap);
- ssTable = SSTableReader.open(ssTable.descriptor); // read the index from disk
- verifyMany(ssTable, bytesMap);
-
- Set<Component> live = SSTable.componentsFor(ssTable.descriptor);
- assert !live.isEmpty() : "SSTable has no live components";
- Set<Component> temp = SSTable.componentsFor(ssTable.descriptor.asTemporary(true));
- assert temp.isEmpty() : "SSTable has unexpected temp components";
- }
-
- private void verifyMany(SSTableReader sstable, Map<ByteBuffer, ByteBuffer> map) throws IOException
- {
- List<ByteBuffer> keys = new ArrayList<ByteBuffer>(map.keySet());
- //Collections.shuffle(keys);
- RandomAccessReader file = sstable.openDataReader();
- for (ByteBuffer key : keys)
- {
- file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position);
- assert key.equals( ByteBufferUtil.readWithShortLength(file));
- int size = (int) file.readLong();
- byte[] bytes2 = new byte[size];
- file.readFully(bytes2);
- assert Arrays.equals(bytes2, map.get(key).array());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index 73abbe2..c3e83cb 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -50,7 +50,7 @@ public class LeaveAndBootstrapTest
private static IPartitioner oldPartitioner;
@BeforeClass
- public static void setup() throws IOException
+ public static void setup() throws IOException, ConfigurationException
{
oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
SchemaLoader.loadSchema();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java
index e30bbde..5454127 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -57,7 +57,7 @@ public class MoveTest
* So instead of extending SchemaLoader, we call it's method below.
*/
@BeforeClass
- public static void setup() throws IOException
+ public static void setup() throws IOException, ConfigurationException
{
oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
SchemaLoader.loadSchema();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 278c8f1..4f3217c 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -65,7 +65,7 @@ public class RemoveTest
UUID removalId;
@BeforeClass
- public static void setupClass() throws IOException
+ public static void setupClass() throws IOException, ConfigurationException
{
oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
SchemaLoader.loadSchema();
[2/2] git commit: Single-pass compaction patch by jbellis;
reviewed by marcuse for CASSANDRA-4180
Posted by jb...@apache.org.
Single-pass compaction
patch by jbellis; reviewed by marcuse for CASSANDRA-4180
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9bd531b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9bd531b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9bd531b
Branch: refs/heads/trunk
Commit: a9bd531bc9a286cc8e81a800aaa29c1fead62dcd
Parents: 3ca160e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue May 7 14:20:53 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue May 7 14:21:23 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Column.java | 27 +-
.../org/apache/cassandra/db/ColumnFamilyStore.java | 4 +-
src/java/org/apache/cassandra/db/ColumnIndex.java | 46 +++-
src/java/org/apache/cassandra/db/OnDiskAtom.java | 5 +-
.../apache/cassandra/db/RowIteratorFactory.java | 3 +-
.../columniterator/ICountableColumnIterator.java | 25 --
.../db/columniterator/IndexedSliceReader.java | 29 +-
.../db/columniterator/SSTableNamesIterator.java | 10 +-
.../db/columniterator/SimpleSliceReader.java | 32 +--
.../db/compaction/AbstractCompactedRow.java | 18 +-
.../cassandra/db/compaction/CompactionManager.java | 53 ++--
.../cassandra/db/compaction/CompactionTask.java | 5 +-
.../db/compaction/LazilyCompactedRow.java | 111 ++------
.../db/compaction/ParallelCompactionIterable.java | 28 +--
.../cassandra/db/compaction/PrecompactedRow.java | 40 +---
.../apache/cassandra/db/compaction/Scrubber.java | 49 ++--
.../apache/cassandra/io/sstable/Descriptor.java | 3 +
.../io/sstable/SSTableBoundedScanner.java | 95 ------
.../io/sstable/SSTableIdentityIterator.java | 99 ++-----
.../apache/cassandra/io/sstable/SSTableReader.java | 13 +-
.../cassandra/io/sstable/SSTableScanner.java | 184 +++++-------
.../apache/cassandra/io/sstable/SSTableWriter.java | 134 ++++-----
.../cassandra/streaming/IncomingStreamReader.java | 33 +--
.../org/apache/cassandra/tools/SSTableExport.java | 54 ++--
test/unit/org/apache/cassandra/SchemaLoader.java | 21 +-
test/unit/org/apache/cassandra/Util.java | 18 --
test/unit/org/apache/cassandra/db/ScrubTest.java | 222 ---------------
.../apache/cassandra/db/SerializationsTest.java | 3 +-
.../cassandra/db/compaction/CompactionsTest.java | 3 +-
.../cassandra/io/LazilyCompactedRowTest.java | 44 ++--
.../apache/cassandra/io/sstable/SSTableTest.java | 112 --------
.../cassandra/service/LeaveAndBootstrapTest.java | 2 +-
.../org/apache/cassandra/service/MoveTest.java | 2 +-
.../org/apache/cassandra/service/RemoveTest.java | 2 +-
35 files changed, 424 insertions(+), 1106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 60accb1..4cd7b6b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0
+ * Single-pass compaction (CASSANDRA-4180)
* Removed token range bisection (CASSANDRA-5518)
* Removed compatibility with pre-1.2.5 sstables and network messages
(CASSANDRA-5511)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/Column.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Column.java b/src/java/org/apache/cassandra/db/Column.java
index bf4f5b3..a8c1f94 100644
--- a/src/java/org/apache/cassandra/db/Column.java
+++ b/src/java/org/apache/cassandra/db/Column.java
@@ -27,6 +27,8 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import com.google.common.collect.AbstractIterator;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -50,33 +52,34 @@ public class Column implements OnDiskAtom
return OnDiskAtom.Serializer.instance;
}
+ /**
+ * For 2.0-formatted sstables (where column count is not stored), @param count should be Integer.MAX_VALUE,
+ * and we will look for the end-of-row column name marker instead of relying on that.
+ */
public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in, final int count, final ColumnSerializer.Flag flag, final int expireBefore, final Descriptor.Version version)
{
- return new Iterator<OnDiskAtom>()
+ return new AbstractIterator<OnDiskAtom>()
{
int i = 0;
- public boolean hasNext()
+ protected OnDiskAtom computeNext()
{
- return i < count;
- }
+ if (i++ >= count)
+ return endOfData();
- public OnDiskAtom next()
- {
- ++i;
+ OnDiskAtom atom;
try
{
- return onDiskSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
+ atom = onDiskSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
}
catch (IOException e)
{
throw new IOError(e);
}
- }
+ if (atom == null)
+ return endOfData();
- public void remove()
- {
- throw new UnsupportedOperationException();
+ return atom;
}
};
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b07d97c..bbbb554 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1480,6 +1480,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, startWith, stopAt, filter, this);
+ // todo this could be pushed into SSTableScanner
return new AbstractScanIterator()
{
protected Row computeNext()
@@ -1498,7 +1499,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (!range.contains(key))
return computeNext();
- logger.trace("scanned {}", key);
+ if (logger.isTraceEnabled())
+ logger.trace("scanned {}", metadata.getKeyValidator().getString(key.key));
return current;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 57bd995..bcc5c2f 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.utils.ByteBufferUtil;
public class ColumnIndex
{
@@ -65,13 +66,21 @@ public class ColumnIndex
private final DataOutput output;
private final RangeTombstone.Tracker tombstoneTracker;
private int atomCount;
+ private final ByteBuffer key;
+ private final DeletionInfo deletionInfo;
public Builder(ColumnFamily cf,
ByteBuffer key,
DataOutput output,
boolean fromStream)
{
- this.indexOffset = rowHeaderSize(key, cf.deletionInfo());
+ assert cf != null;
+ assert key != null;
+ assert output != null;
+
+ this.key = key;
+ deletionInfo = cf.deletionInfo();
+ this.indexOffset = rowHeaderSize(key, deletionInfo);
this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>());
this.output = output;
this.tombstoneTracker = fromStream ? null : new RangeTombstone.Tracker(cf.getComparator());
@@ -94,9 +103,7 @@ public class ColumnIndex
// TODO fix constantSize when changing the nativeconststs.
int keysize = key.remaining();
return typeSizes.sizeof((short) keysize) + keysize // Row key
- + typeSizes.sizeof(0L) // Row data size
- + DeletionTime.serializer.serializedSize(delInfo.getTopLevelDeletion(), typeSizes)
- + typeSizes.sizeof(0); // Column count
+ + DeletionTime.serializer.serializedSize(delInfo.getTopLevelDeletion(), typeSizes);
}
public RangeTombstone.Tracker tombstoneTracker()
@@ -119,6 +126,7 @@ public class ColumnIndex
*/
public ColumnIndex build(ColumnFamily cf) throws IOException
{
+ // cf has disentangled the columns and range tombstones, we need to re-interleave them in comparator order
Iterator<RangeTombstone> rangeIter = cf.deletionInfo().rangeIterator();
RangeTombstone tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
Comparator<ByteBuffer> comparator = cf.getComparator();
@@ -138,15 +146,22 @@ public class ColumnIndex
add(tombstone);
tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
}
- return build();
+ ColumnIndex index = build();
+
+ finish();
+
+ return index;
}
public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException
{
for (OnDiskAtom c : columns)
add(c);
+ ColumnIndex index = build();
+
+ finish();
- return build();
+ return index;
}
public void add(OnDiskAtom column) throws IOException
@@ -177,8 +192,8 @@ public class ColumnIndex
lastBlockClosing = column;
}
- if (output != null)
- atomSerializer.serializeForSSTable(column, output);
+ maybeWriteRowHeader();
+ atomSerializer.serializeForSSTable(column, output);
// TODO: Should deal with removing unneeded tombstones
if (tombstoneTracker != null)
@@ -187,6 +202,15 @@ public class ColumnIndex
lastColumn = column;
}
+ private void maybeWriteRowHeader() throws IOException
+ {
+ if (lastColumn == null)
+ {
+ ByteBufferUtil.writeWithShortLength(key, output);
+ DeletionInfo.serializer().serializeForSSTable(deletionInfo, output);
+ }
+ }
+
public ColumnIndex build()
{
// all columns were GC'd after all
@@ -204,5 +228,11 @@ public class ColumnIndex
assert result.columnsIndex.size() > 0;
return result;
}
+
+ public void finish() throws IOException
+ {
+ if (!deletionInfo.equals(DeletionInfo.LIVE))
+ maybeWriteRowHeader();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
index 7cf7914..2fdb7ad 100644
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ b/src/java/org/apache/cassandra/db/OnDiskAtom.java
@@ -73,7 +73,10 @@ public interface OnDiskAtom
{
ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
if (name.remaining() <= 0)
- throw ColumnSerializer.CorruptColumnException.create(in, name);
+ {
+ // SSTableWriter.END_OF_ROW
+ return null;
+ }
int b = in.readUnsignedByte();
if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 1bc506b..4588146 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -69,8 +69,7 @@ public class RowIteratorFactory
for (SSTableReader sstable : sstables)
{
- final SSTableScanner scanner = sstable.getScanner(filter);
- scanner.seekTo(startWith);
+ final SSTableScanner scanner = sstable.getScanner(filter, startWith);
iterators.add(scanner);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java b/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
deleted file mode 100644
index 0b5fa97..0000000
--- a/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.columniterator;
-
-public interface ICountableColumnIterator extends OnDiskAtomIterator
-{
- public int getColumnCount();
-
- public void reset();
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index ffbca1a..ccf9149 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -74,16 +74,15 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
{
Descriptor.Version version = sstable.descriptor.version;
this.indexes = indexEntry.columnsIndex();
+ emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata);
if (indexes.isEmpty())
{
- setToRowStart(sstable, indexEntry, input);
- this.emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata);
+ setToRowStart(indexEntry, input);
emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version));
fetcher = new SimpleBlockFetcher();
}
else
{
- emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata);
emptyColumnFamily.delete(indexEntry.deletionTime());
fetcher = new IndexedBlockFetcher(indexEntry.position);
}
@@ -98,19 +97,20 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
/**
* Sets the seek position to the start of the row for column scanning.
*/
- private void setToRowStart(SSTableReader reader, RowIndexEntry indexEntry, FileDataInput input) throws IOException
+ private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) throws IOException
{
- if (input == null)
+ if (in == null)
{
- this.file = sstable.getFileDataInput(indexEntry.position);
+ this.file = sstable.getFileDataInput(rowEntry.position);
}
else
{
- this.file = input;
- input.seek(indexEntry.position);
+ this.file = in;
+ in.seek(rowEntry.position);
}
sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
- file.readLong();
+ if (sstable.descriptor.version.hasRowSizeAndColumnCount)
+ file.readLong();
}
public ColumnFamily getColumnFamily()
@@ -210,7 +210,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
private class IndexedBlockFetcher extends BlockFetcher
{
// where this row starts
- private final long basePosition;
+ private final long columnsStart;
// the index entry for the next block to deserialize
private int nextIndexIdx = -1;
@@ -222,10 +222,10 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
// may still match a slice
private final Deque<OnDiskAtom> prefetched;
- public IndexedBlockFetcher(long basePosition)
+ public IndexedBlockFetcher(long columnsStart)
{
super(-1);
- this.basePosition = basePosition;
+ this.columnsStart = columnsStart;
this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null;
setNextSlice();
}
@@ -330,7 +330,7 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
IndexInfo currentIndex = indexes.get(lastDeserializedBlock);
/* seek to the correct offset to the data, and calculate the data size */
- long positionToSeek = basePosition + currentIndex.offset;
+ long positionToSeek = columnsStart + currentIndex.offset;
// With new promoted indexes, our first seek in the data file will happen at that point.
if (file == null)
@@ -420,7 +420,8 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
// We remenber when we are whithin a slice to avoid some comparison
boolean inSlice = false;
- Iterator<OnDiskAtom> atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, file.readInt(), sstable.descriptor.version);
+ int columnCount = sstable.descriptor.version.hasRowSizeAndColumnCount ? file.readInt() : Integer.MAX_VALUE;
+ Iterator<OnDiskAtom> atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, columnCount, sstable.descriptor.version);
while (atomIterator.hasNext())
{
OnDiskAtom column = atomIterator.next();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index bc0af30..f0e6dc7 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -115,7 +115,8 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath());
- file.readLong();
+ if (sstable.descriptor.version.hasRowSizeAndColumnCount)
+ file.readLong();
}
indexList = indexEntry.columnsIndex();
@@ -142,7 +143,8 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
List<OnDiskAtom> result = new ArrayList<OnDiskAtom>();
if (indexList.isEmpty())
{
- readSimpleColumns(file, columns, result);
+ int columnCount = sstable.descriptor.version.hasRowSizeAndColumnCount ? file.readInt() : Integer.MAX_VALUE;
+ readSimpleColumns(file, columns, result, columnCount);
}
else
{
@@ -153,9 +155,9 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
iter = result.iterator();
}
- private void readSimpleColumns(FileDataInput file, SortedSet<ByteBuffer> columnNames, List<OnDiskAtom> result) throws IOException
+ private void readSimpleColumns(FileDataInput file, SortedSet<ByteBuffer> columnNames, List<OnDiskAtom> result, int columnCount) throws IOException
{
- Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, file.readInt(), sstable.descriptor.version);
+ Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, columnCount, sstable.descriptor.version);
int n = 0;
while (atomIterator.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
index efbb92c..d0d74c6 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
import java.util.Iterator;
import com.google.common.collect.AbstractIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -30,21 +32,22 @@ import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.IndexHelper;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.utils.ByteBufferUtil;
class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
{
+ private static final Logger logger = LoggerFactory.getLogger(SimpleSliceReader.class);
+
private final FileDataInput file;
private final boolean needsClosing;
private final ByteBuffer finishColumn;
private final AbstractType<?> comparator;
private final ColumnFamily emptyColumnFamily;
- private FileMark mark;
private final Iterator<OnDiskAtom> atomIterator;
public SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ByteBuffer finishColumn)
{
+ logger.debug("Slicing {}", sstable);
this.finishColumn = finishColumn;
this.comparator = sstable.metadata.comparator;
try
@@ -61,16 +64,17 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
this.needsClosing = false;
}
+ Descriptor.Version version = sstable.descriptor.version;
+
// Skip key and data size
ByteBufferUtil.skipShortLength(file);
- file.readLong();
-
- Descriptor.Version version = sstable.descriptor.version;
+ if (version.hasRowSizeAndColumnCount)
+ file.readLong();
emptyColumnFamily = EmptyColumns.factory.create(sstable.metadata);
- emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, version));
- atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, file.readInt(), version);
- mark = file.mark();
+ emptyColumnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(file, sstable.descriptor.version));
+ int columnCount = version.hasRowSizeAndColumnCount ? file.readInt() : Integer.MAX_VALUE;
+ atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, columnCount, sstable.descriptor.version);
}
catch (IOException e)
{
@@ -84,20 +88,10 @@ class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAt
if (!atomIterator.hasNext())
return endOfData();
- OnDiskAtom column;
- try
- {
- file.reset(mark);
- column = atomIterator.next();
- }
- catch (IOException e)
- {
- throw new CorruptSSTableException(e, file.getPath());
- }
+ OnDiskAtom column = atomIterator.next();
if (finishColumn.remaining() > 0 && comparator.compare(column.name(), finishColumn) > 0)
return endOfData();
- mark = file.mark();
return column;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 56e9353..cd2952a 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.security.MessageDigest;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.io.sstable.ColumnStats;
import org.apache.cassandra.db.DeletionInfo;
import org.apache.cassandra.db.ColumnIndex;
@@ -48,7 +49,7 @@ public abstract class AbstractCompactedRow implements Closeable
*
* write() may change internal state; it is NOT valid to call write() or update() a second time.
*/
- public abstract long write(DataOutput out) throws IOException;
+ public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
/**
* update @param digest with the data bytes of the row (not including row key or row size).
@@ -59,23 +60,8 @@ public abstract class AbstractCompactedRow implements Closeable
public abstract void update(MessageDigest digest);
/**
- * @return true if there are no columns in the row AND there are no row-level tombstones to be preserved
- */
- public abstract boolean isEmpty();
-
- /**
* @return aggregate information about the columns in this row. Some fields may
* contain default values if computing them value would require extra effort we're not willing to make.
*/
public abstract ColumnStats columnStats();
-
- /**
- * @return the compacted row deletion infos.
- */
- public abstract DeletionInfo deletionInfo();
-
- /**
- * @return the column index for this row.
- */
- public abstract ColumnIndex index();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 877d349..758068c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -482,9 +482,6 @@ public class CompactionManager implements CompactionManagerMBean
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
- SSTableWriter writer = null;
- SSTableReader newSstable = null;
-
logger.info("Cleaning up " + sstable);
// Calculate the expected compacted filesize
long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable), OperationType.CLEANUP);
@@ -498,6 +495,11 @@ public class CompactionManager implements CompactionManagerMBean
CleanupInfo ci = new CleanupInfo(sstable, scanner);
metrics.beginCompaction(ci);
+ SSTableWriter writer = createWriter(cfs,
+ compactionFileLocation,
+ expectedBloomFilterSize,
+ sstable);
+ SSTableReader newSstable = null;
try
{
while (scanner.hasNext())
@@ -508,11 +510,8 @@ public class CompactionManager implements CompactionManagerMBean
if (Range.isInRanges(row.getKey().token, ranges))
{
AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (compactedRow.isEmpty())
- continue;
- writer = maybeCreateWriter(cfs, OperationType.CLEANUP, compactionFileLocation, expectedBloomFilterSize, writer, sstable);
- writer.append(compactedRow);
- totalkeysWritten++;
+ if (writer.append(compactedRow) != null)
+ totalkeysWritten++;
}
else
{
@@ -553,13 +552,14 @@ public class CompactionManager implements CompactionManagerMBean
}
}
}
- if (writer != null)
+ if (totalkeysWritten > 0)
newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+ else
+ writer.abort();
}
catch (Throwable e)
{
- if (writer != null)
- writer.abort();
+ writer.abort();
throw Throwables.propagate(e);
}
finally
@@ -589,23 +589,17 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- public static SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs,
- OperationType compactionType,
- File compactionFileLocation,
- int expectedBloomFilterSize,
- SSTableWriter writer,
- SSTableReader sstable)
+ public static SSTableWriter createWriter(ColumnFamilyStore cfs,
+ File compactionFileLocation,
+ int expectedBloomFilterSize,
+ SSTableReader sstable)
{
- if (writer == null)
- {
- FileUtils.createDirectory(compactionFileLocation);
- writer = new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
- expectedBloomFilterSize,
- cfs.metadata,
- cfs.partitioner,
- SSTableMetadata.createCollector(Collections.singleton(sstable), sstable.getSSTableLevel()));
- }
- return writer;
+ FileUtils.createDirectory(compactionFileLocation);
+ return new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
+ expectedBloomFilterSize,
+ cfs.metadata,
+ cfs.partitioner,
+ SSTableMetadata.createCollector(Collections.singleton(sstable), sstable.getSSTableLevel()));
}
/**
@@ -661,10 +655,7 @@ public class CompactionManager implements CompactionManagerMBean
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());
AbstractCompactedRow row = iter.next();
- if (row.isEmpty())
- row.close();
- else
- validator.add(row);
+ validator.add(row);
}
validator.complete();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 3b4fdc8..e32956b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -156,7 +156,8 @@ public class CompactionTask extends AbstractCompactionTask
throw new CompactionInterruptedException(ci.getCompactionInfo());
AbstractCompactedRow row = iter.next();
- if (row.isEmpty())
+ RowIndexEntry indexEntry = writer.append(row);
+ if (indexEntry == null)
{
controller.invalidateCachedRow(row.key);
row.close();
@@ -166,8 +167,6 @@ public class CompactionTask extends AbstractCompactionTask
// If the row is cached, we call removeDeleted on at read time it to have coherent query returns,
// but if the row is not pushed out of the cache, obsolete tombstones will persist indefinitely.
controller.removeDeletedInCache(row.key);
-
- RowIndexEntry indexEntry = writer.append(row);
totalkeysWritten++;
if (DatabaseDescriptor.getPreheatKeyCache())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index b799f0d..444ee11 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -29,13 +29,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.ColumnStats;
import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.MergeIterator;
import org.apache.cassandra.utils.StreamingHistogram;
@@ -45,35 +46,30 @@ import org.apache.cassandra.utils.StreamingHistogram;
* of the rows being compacted, and merging them as it does so. So the most we have
* in memory at a time is the bloom filter, the index, and one column from each
* pre-compaction row.
- *
- * When write() or update() is called, a second pass is made over the pre-compaction
- * rows to write the merged columns or update the hash, again with at most one column
- * from each row deserialized at a time.
*/
public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable<OnDiskAtom>
{
private static Logger logger = LoggerFactory.getLogger(LazilyCompactedRow.class);
- private final List<? extends ICountableColumnIterator> rows;
+ private final List<? extends OnDiskAtomIterator> rows;
private final CompactionController controller;
private final boolean shouldPurge;
private ColumnFamily emptyColumnFamily;
private Reducer reducer;
- private final ColumnStats columnStats;
- private long columnSerializedSize;
+ private ColumnStats columnStats;
private boolean closed;
private ColumnIndex.Builder indexBuilder;
- private ColumnIndex columnsIndex;
private final SecondaryIndexManager.Updater indexer;
+ private long maxDelTimestamp;
- public LazilyCompactedRow(CompactionController controller, List<? extends ICountableColumnIterator> rows)
+ public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
{
super(rows.get(0).getKey());
this.rows = rows;
this.controller = controller;
indexer = controller.cfs.indexManager.updaterFor(key);
- long maxDelTimestamp = Long.MIN_VALUE;
+ maxDelTimestamp = Long.MIN_VALUE;
for (OnDiskAtomIterator row : rows)
{
ColumnFamily cf = row.getColumnFamily();
@@ -85,58 +81,45 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
emptyColumnFamily.delete(cf);
}
this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
+ }
+
+ public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
+ {
+ assert !closed;
+ ColumnIndex columnsIndex;
try
{
- indexAndWrite(null);
+ indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
+ columnsIndex = indexBuilder.build(this);
+ if (columnsIndex.columnsIndex.isEmpty())
+ {
+ boolean cfIrrelevant = shouldPurge
+ ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
+ : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
+ if (cfIrrelevant)
+ return null;
+ }
}
catch (IOException e)
{
throw new RuntimeException(e);
}
- // reach into the reducer used during iteration to get column count, size, max column timestamp
+ // reach into the reducer (created during iteration) to get column count, size, max column timestamp
// (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
- columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
- reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
+ columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
+ reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones
);
- columnSerializedSize = reducer == null ? 0 : reducer.serializedSize;
reducer = null;
- }
-
- private void indexAndWrite(DataOutput out) throws IOException
- {
- this.indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
- this.columnsIndex = indexBuilder.build(this);
- }
-
- public long write(DataOutput out) throws IOException
- {
- assert !closed;
-
- DataOutputBuffer clockOut = new DataOutputBuffer();
- DeletionInfo.serializer().serializeForSSTable(emptyColumnFamily.deletionInfo(), clockOut);
-
- long dataSize = clockOut.getLength() + columnSerializedSize;
- if (logger.isDebugEnabled())
- logger.debug(String.format("clock / column sizes are %s / %s", clockOut.getLength(), columnSerializedSize));
- assert dataSize > 0;
- out.writeLong(dataSize);
- out.write(clockOut.getData(), 0, clockOut.getLength());
- out.writeInt(indexBuilder.writtenAtomCount());
- // We rebuild the column index uselessly, but we need to do that because range tombstone markers depend
- // on indexing. If we're able to remove the two-phase compaction, we'll avoid that.
- indexAndWrite(out);
-
- long secondPassColumnSize = reducer == null ? 0 : reducer.serializedSize;
- assert secondPassColumnSize == columnSerializedSize
- : "originally calculated column size of " + columnSerializedSize + " but now it is " + secondPassColumnSize;
+ out.writeShort(SSTableWriter.END_OF_ROW);
close();
- return dataSize;
+
+ return RowIndexEntry.create(currentPosition, emptyColumnFamily.deletionInfo().getTopLevelDeletion(), columnsIndex);
}
public void update(MessageDigest digest)
@@ -150,7 +133,6 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
try
{
DeletionInfo.serializer().serializeForSSTable(emptyColumnFamily.deletionInfo(), out);
- out.writeInt(columnStats.columnCount);
digest.update(out.getData(), 0, out.getLength());
}
catch (IOException e)
@@ -158,30 +140,14 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
throw new AssertionError(e);
}
+ // initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator
+ indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
Iterator<OnDiskAtom> iter = iterator();
while (iter.hasNext())
- {
iter.next().updateDigest(digest);
- }
close();
}
- public boolean isEmpty()
- {
- boolean cfIrrelevant = shouldPurge
- ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
- : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
- return cfIrrelevant && columnStats.columnCount == 0;
- }
-
- public int getEstimatedColumnCount()
- {
- int n = 0;
- for (ICountableColumnIterator row : rows)
- n += row.getColumnCount();
- return n;
- }
-
public AbstractType<?> getComparator()
{
return emptyColumnFamily.getComparator();
@@ -189,8 +155,6 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
public Iterator<OnDiskAtom> iterator()
{
- for (ICountableColumnIterator row : rows)
- row.reset();
reducer = new Reducer();
Iterator<OnDiskAtom> iter = MergeIterator.get(rows, getComparator().onDiskAtomComparator, reducer);
return Iterators.filter(iter, Predicates.notNull());
@@ -217,19 +181,6 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
closed = true;
}
- public DeletionInfo deletionInfo()
- {
- return emptyColumnFamily.deletionInfo();
- }
-
- /**
- * @return the column index for this row.
- */
- public ColumnIndex index()
- {
- return columnsIndex;
- }
-
private class Reducer extends MergeIterator.Reducer<OnDiskAtom, OnDiskAtom>
{
// all columns reduced together will have the same name, so there will only be one column
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 718b1e3..c0bce30 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.utils.*;
@@ -164,7 +164,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
return new CompactedRowContainer(rows.get(0).getKey(), executor.submit(new MergeTask(rawRows)));
}
- List<ICountableColumnIterator> iterators = new ArrayList<ICountableColumnIterator>(rows.size());
+ List<OnDiskAtomIterator> iterators = new ArrayList<OnDiskAtomIterator>(rows.size());
for (RowContainer container : rows)
iterators.add(container.row == null ? container.wrapper : new DeserializedColumnIterator(container.row));
return new CompactedRowContainer(new LazilyCompactedRow(controller, iterators));
@@ -203,7 +203,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
}
}
- private class DeserializedColumnIterator implements ICountableColumnIterator
+ private class DeserializedColumnIterator implements OnDiskAtomIterator
{
private final Row row;
private Iterator<Column> iter;
@@ -224,16 +224,6 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
return row.key;
}
- public int getColumnCount()
- {
- return row.cf.getColumnCount();
- }
-
- public void reset()
- {
- iter = row.cf.iterator();
- }
-
public void close() throws IOException {}
public boolean hasNext()
@@ -319,7 +309,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
/**
* a wrapper around SSTII that notifies the given condition when it is closed
*/
- private static class NotifyingSSTableIdentityIterator implements ICountableColumnIterator
+ private static class NotifyingSSTableIdentityIterator implements OnDiskAtomIterator
{
private final SSTableIdentityIterator wrapped;
private final Condition condition;
@@ -340,16 +330,6 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
return wrapped.getKey();
}
- public int getColumnCount()
- {
- return wrapped.getColumnCount();
- }
-
- public void reset()
- {
- wrapped.reset();
- }
-
public void close() throws IOException
{
wrapped.close();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 1e7285a..1b5d4a9 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.io.sstable.ColumnStats;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.FBUtilities;
@@ -42,7 +43,6 @@ import org.apache.cassandra.utils.MergeIterator;
public class PrecompactedRow extends AbstractCompactedRow
{
private final ColumnFamily compactedCf;
- private ColumnIndex columnIndex;
/** it is caller's responsibility to call removeDeleted + removeOldShards from the cf before calling this constructor */
public PrecompactedRow(DecoratedKey key, ColumnFamily cf)
@@ -152,21 +152,12 @@ public class PrecompactedRow extends AbstractCompactedRow
filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC);
}
- public long write(DataOutput out) throws IOException
+ public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
{
- assert compactedCf != null;
- DataOutputBuffer buffer = new DataOutputBuffer();
- ColumnIndex.Builder builder = new ColumnIndex.Builder(compactedCf, key.key, buffer);
- columnIndex = builder.build(compactedCf);
-
- TypeSizes typeSizes = TypeSizes.NATIVE;
- long delSize = DeletionTime.serializer.serializedSize(compactedCf.deletionInfo().getTopLevelDeletion(), typeSizes);
- long dataSize = buffer.getLength() + delSize + typeSizes.sizeof(0);
- out.writeLong(dataSize);
- DeletionInfo.serializer().serializeForSSTable(compactedCf.deletionInfo(), out);
- out.writeInt(builder.writtenAtomCount());
- out.write(buffer.getData(), 0, buffer.getLength());
- return dataSize;
+ if (compactedCf == null)
+ return null;
+
+ return SSTableWriter.rawAppend(compactedCf, currentPosition, key, out);
}
public void update(MessageDigest digest)
@@ -176,7 +167,6 @@ public class PrecompactedRow extends AbstractCompactedRow
try
{
DeletionInfo.serializer().serializeForSSTable(compactedCf.deletionInfo(), buffer);
- buffer.writeInt(compactedCf.getColumnCount());
digest.update(buffer.getData(), 0, buffer.getLength());
}
catch (IOException e)
@@ -186,11 +176,6 @@ public class PrecompactedRow extends AbstractCompactedRow
compactedCf.updateDigest(digest);
}
- public boolean isEmpty()
- {
- return compactedCf == null;
- }
-
public ColumnStats columnStats()
{
return compactedCf.getColumnStats();
@@ -207,18 +192,5 @@ public class PrecompactedRow extends AbstractCompactedRow
return compactedCf;
}
- public DeletionInfo deletionInfo()
- {
- return compactedCf.deletionInfo();
- }
-
- /**
- * @return the column index for this row.
- */
- public ColumnIndex index()
- {
- return columnIndex;
- }
-
public void close() { }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 421385a..5809741 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -111,7 +111,7 @@ public class Scrubber implements Closeable
}
// TODO errors when creating the writer may leave empty temp files.
- writer = CompactionManager.maybeCreateWriter(cfs, OperationType.SCRUB, destination, expectedBloomFilterSize, null, sstable);
+ writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
AbstractCompactedRow prevRow = null;
@@ -168,25 +168,21 @@ public class Scrubber implements Closeable
throw new IOError(new IOException("Unable to read row key from data file"));
if (dataSize > dataFile.length())
throw new IOError(new IOException("Impossible row size " + dataSize));
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+
+ SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (compactedRow.isEmpty())
+ if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
{
- emptyRows++;
+ outOfOrderRows.add(compactedRow);
+ outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ continue;
}
- else
- {
- if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
- {
- outOfOrderRows.add(compactedRow);
- outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
- continue;
- }
- writer.append(compactedRow);
- prevRow = compactedRow;
+ if (writer.append(compactedRow) == null)
+ emptyRows++;
+ else
goodRows++;
- }
+ prevRow = compactedRow;
if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
outputHandler.warn("Index file contained a different key or row size; using key from data file");
}
@@ -204,24 +200,19 @@ public class Scrubber implements Closeable
key = sstable.partitioner.decorateKey(currentIndexKey);
try
{
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+ SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true);
AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (compactedRow.isEmpty())
+ if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
{
- emptyRows++;
+ outOfOrderRows.add(compactedRow);
+ outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ continue;
}
+ if (writer.append(compactedRow) == null)
+ emptyRows++;
else
- {
- if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
- {
- outOfOrderRows.add(compactedRow);
- outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
- continue;
- }
- writer.append(compactedRow);
- prevRow = compactedRow;
goodRows++;
- }
+ prevRow = compactedRow;
}
catch (Throwable th2)
{
@@ -266,7 +257,7 @@ public class Scrubber implements Closeable
if (!outOfOrderRows.isEmpty())
{
- SSTableWriter inOrderWriter = CompactionManager.maybeCreateWriter(cfs, OperationType.SCRUB, destination, expectedBloomFilterSize, null, sstable);
+ SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
for (AbstractCompactedRow row : outOfOrderRows)
inOrderWriter.append(row);
newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index e2a7a18..fdc99b9 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -53,6 +53,7 @@ public class Descriptor
// into this new format)
// tracks max local deletiontime in sstable metadata
// records bloom_filter_fp_chance in metadata component
+ // remove data size and column count from data file (CASSANDRA-4180)
public static final Version CURRENT = new Version(current_version);
@@ -63,6 +64,7 @@ public class Descriptor
public final boolean tracksMaxLocalDeletionTime;
public final boolean hasBloomFilterFPChance;
public final boolean offHeapSummaries;
+ public final boolean hasRowSizeAndColumnCount;
public Version(String version)
{
@@ -72,6 +74,7 @@ public class Descriptor
hasSuperColumns = version.compareTo("ja") < 0;
hasBloomFilterFPChance = version.compareTo("ja") >= 0;
offHeapSummaries = version.compareTo("ja") >= 0;
+ hasRowSizeAndColumnCount = version.compareTo("ja") < 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
deleted file mode 100644
index 4febea6..0000000
--- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * A SSTableScanner that only reads key in a given range (for validation compaction).
- */
-public class SSTableBoundedScanner extends SSTableScanner
-{
- private final Iterator<Pair<Long, Long>> rangeIterator;
- private Pair<Long, Long> currentRange;
-
- SSTableBoundedScanner(SSTableReader sstable, Iterator<Pair<Long, Long>> rangeIterator, RateLimiter limiter)
- {
- super(sstable, limiter);
- assert rangeIterator.hasNext(); // use EmptyCompactionScanner otherwise
- this.rangeIterator = rangeIterator;
- currentRange = rangeIterator.next();
- dfile.seek(currentRange.left);
- }
-
- /*
- * This shouldn't be used with a bounded scanner as it could put the
- * bounded scanner outside it's range.
- */
- @Override
- public void seekTo(RowPosition seekKey)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean hasNext()
- {
- if (iterator == null)
- iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : new BoundedKeyScanningIterator();
- return iterator.hasNext();
- }
-
- @Override
- public OnDiskAtomIterator next()
- {
- if (iterator == null)
- iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : new BoundedKeyScanningIterator();
- return iterator.next();
- }
-
- protected class BoundedKeyScanningIterator extends KeyScanningIterator
- {
- @Override
- public boolean hasNext()
- {
- if (!super.hasNext())
- return false;
-
- if (finishedAt < currentRange.right)
- return true;
-
- if (rangeIterator.hasNext())
- {
- currentRange = rangeIterator.next();
- finishedAt = currentRange.left; // next() will seek for us
- return true;
- }
- else
- {
- return false;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 8e636c5..d35e14d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -25,30 +25,25 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.BytesReadTracker;
-public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, ICountableColumnIterator
+public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, OnDiskAtomIterator
{
private static final Logger logger = LoggerFactory.getLogger(SSTableIdentityIterator.class);
private final DecoratedKey key;
- private final DataInput input;
- private final long dataStart;
- public final long dataSize;
+ private final DataInput in;
+ public final long dataSize; // we [still] require this so compaction can tell if it's safe to read the row into memory
public final ColumnSerializer.Flag flag;
private final ColumnFamily columnFamily;
private final int columnCount;
- private final long columnPosition;
private final Iterator<OnDiskAtom> atomIterator;
private final Descriptor.Version dataVersion;
- private final BytesReadTracker inputWithTracker; // tracks bytes read
-
// Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
private final int expireBefore;
@@ -60,13 +55,12 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
* @param sstable SSTable we are reading ffrom.
* @param file Reading using this file.
* @param key Key of this row.
- * @param dataStart Data for this row starts at this pos.
* @param dataSize length of row data
* @throws IOException
*/
- public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataStart, long dataSize)
+ public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataSize)
{
- this(sstable, file, key, dataStart, dataSize, false);
+ this(sstable, file, key, dataSize, false);
}
/**
@@ -74,39 +68,29 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
* @param sstable SSTable we are reading ffrom.
* @param file Reading using this file.
* @param key Key of this row.
- * @param dataStart Data for this row starts at this pos.
* @param dataSize length of row data
* @param checkData if true, do its best to deserialize and check the coherence of row data
*/
- public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataStart, long dataSize, boolean checkData)
+ public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, long dataSize, boolean checkData)
{
- this(sstable.metadata, file, file.getPath(), key, dataStart, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
- }
-
- // Must only be used against current file format
- public SSTableIdentityIterator(CFMetaData metadata, DataInput file, String filename, DecoratedKey key, long dataStart, long dataSize, ColumnSerializer.Flag flag)
- {
- this(metadata, file, filename, key, dataStart, dataSize, false, null, flag);
+ this(sstable.metadata, file, file.getPath(), key, dataSize, checkData, sstable, ColumnSerializer.Flag.LOCAL);
}
// sstable may be null *if* checkData is false
// If it is null, we assume the data is in the current file format
private SSTableIdentityIterator(CFMetaData metadata,
- DataInput input,
+ DataInput in,
String filename,
DecoratedKey key,
- long dataStart,
long dataSize,
boolean checkData,
SSTableReader sstable,
ColumnSerializer.Flag flag)
{
assert !checkData || (sstable != null);
- this.input = input;
+ this.in = in;
this.filename = filename;
- this.inputWithTracker = new BytesReadTracker(input);
this.key = key;
- this.dataStart = dataStart;
this.dataSize = dataSize;
this.expireBefore = (int)(System.currentTimeMillis() / 1000);
this.flag = flag;
@@ -115,21 +99,10 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
try
{
- if (input instanceof RandomAccessReader)
- {
- RandomAccessReader file = (RandomAccessReader) input;
- file.seek(this.dataStart);
- if (dataStart + dataSize > file.length())
- throw new IOException(String.format("dataSize of %s starting at %s would be larger than file %s length %s",
- dataSize, dataStart, file.getPath(), file.length()));
- }
-
columnFamily = EmptyColumns.factory.create(metadata);
- columnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(inputWithTracker, dataVersion));
-
- columnCount = inputWithTracker.readInt();
- atomIterator = columnFamily.metadata().getOnDiskIterator(inputWithTracker, columnCount, dataVersion);
- columnPosition = dataStart + inputWithTracker.getBytesRead();
+ columnFamily.delete(DeletionInfo.serializer().deserializeFromSSTable(in, dataVersion));
+ columnCount = dataVersion.hasRowSizeAndColumnCount ? in.readInt() : Integer.MAX_VALUE;
+ atomIterator = columnFamily.metadata().getOnDiskIterator(in, columnCount, dataVersion);
}
catch (IOException e)
{
@@ -151,7 +124,18 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
public boolean hasNext()
{
- return inputWithTracker.getBytesRead() < dataSize;
+ try
+ {
+ return atomIterator.hasNext();
+ }
+ catch (IOError e)
+ {
+ // catch here b/c atomIterator is an AbstractIterator; hasNext reads the value
+ if (e.getCause() instanceof IOException)
+ throw new CorruptSSTableException((IOException)e.getCause(), filename);
+ else
+ throw e;
+ }
}
public OnDiskAtom next()
@@ -163,13 +147,6 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
atom.validateFields(columnFamily.metadata());
return atom;
}
- catch (IOError e)
- {
- if (e.getCause() instanceof IOException)
- throw new CorruptSSTableException((IOException)e.getCause(), filename);
- else
- throw e;
- }
catch (MarshalException me)
{
throw new CorruptSSTableException(me, filename);
@@ -189,9 +166,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
public String getPath()
{
// if input is from file, then return that path, otherwise it's from streaming
- if (input instanceof RandomAccessReader)
+ if (in instanceof RandomAccessReader)
{
- RandomAccessReader file = (RandomAccessReader) input;
+ RandomAccessReader file = (RandomAccessReader) in;
return file.getPath();
}
else
@@ -202,10 +179,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
public ColumnFamily getColumnFamilyWithColumns(ColumnFamily.Factory containerFactory)
{
- assert inputWithTracker.getBytesRead() == headerSize();
ColumnFamily cf = columnFamily.cloneMeShallow(containerFactory, false);
// since we already read column count, just pass that value and continue deserialization
- columnFamily.serializer.deserializeColumnsFromSSTable(inputWithTracker, cf, columnCount, flag, expireBefore, dataVersion);
+ columnFamily.serializer.deserializeColumnsFromSSTable(in, cf, columnCount, flag, expireBefore, dataVersion);
if (validateColumns)
{
try
@@ -220,28 +196,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
return cf;
}
- private long headerSize()
- {
- return columnPosition - dataStart;
- }
-
public int compareTo(SSTableIdentityIterator o)
{
return key.compareTo(o.key);
}
- public void reset()
- {
- if (!(input instanceof RandomAccessReader))
- throw new UnsupportedOperationException();
-
- RandomAccessReader file = (RandomAccessReader) input;
- file.seek(columnPosition);
- inputWithTracker.reset(headerSize());
- }
-
- public int getColumnCount()
- {
- return columnCount;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a09f65b..baba472 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1000,10 +1000,15 @@ public class SSTableReader extends SSTable
*/
public SSTableScanner getScanner(QueryFilter filter)
{
- return new SSTableScanner(this, filter);
+ return new SSTableScanner(this, filter, null);
}
- /**
+ public SSTableScanner getScanner(QueryFilter filter, RowPosition startWith)
+ {
+ return new SSTableScanner(this, filter, startWith, null);
+ }
+
+ /**
* I/O SSTableScanner
* @return A Scanner for seeking over the rows of the SSTable.
*/
@@ -1014,7 +1019,7 @@ public class SSTableReader extends SSTable
public SSTableScanner getScanner(RateLimiter limiter)
{
- return new SSTableScanner(this, limiter);
+ return new SSTableScanner(this, null, limiter);
}
/**
@@ -1030,7 +1035,7 @@ public class SSTableReader extends SSTable
Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator();
if (rangeIterator.hasNext())
- return new SSTableBoundedScanner(this, rangeIterator, limiter);
+ return new SSTableScanner(this, null, range, limiter);
else
return new EmptyCompactionScanner(getFilename());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 38b9e65..fb52a02 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -18,11 +18,12 @@
package org.apache.cassandra.io.sstable;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Iterator;
import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.collect.AbstractIterator;
+
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.RowPosition;
@@ -31,6 +32,8 @@ import org.apache.cassandra.db.columniterator.LazyColumnIterator;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.ICompactionScanner;
import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -40,77 +43,83 @@ public class SSTableScanner implements ICompactionScanner
protected final RandomAccessReader dfile;
protected final RandomAccessReader ifile;
public final SSTableReader sstable;
- private OnDiskAtomIterator row;
- protected boolean exhausted = false;
protected Iterator<OnDiskAtomIterator> iterator;
private final QueryFilter filter;
+ private long stopAt;
/**
- * @param sstable SSTable to scan.
- * @param limiter
+ * @param sstable SSTable to scan; must not be null
+ * @param filter filter to use when scanning the columns; may be null
+ * @param limiter background i/o RateLimiter; may be null
*/
- SSTableScanner(SSTableReader sstable, RateLimiter limiter)
+ SSTableScanner(SSTableReader sstable, QueryFilter filter, RateLimiter limiter)
{
- this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
- this.ifile = sstable.openIndexReader();
- this.sstable = sstable;
- this.filter = null;
- }
+ assert sstable != null;
- /**
- * @param sstable SSTable to scan.
- * @param filter filter to use when scanning the columns
- */
- SSTableScanner(SSTableReader sstable, QueryFilter filter)
- {
- this.dfile = sstable.openDataReader();
+ this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
this.ifile = sstable.openIndexReader();
this.sstable = sstable;
this.filter = filter;
+ stopAt = dfile.length();
}
- public void close() throws IOException
+ public SSTableScanner(SSTableReader sstable, QueryFilter filter, RowPosition startWith, RateLimiter limiter)
{
- FileUtils.close(dfile, ifile);
- }
+ this(sstable, filter, limiter);
+
+ long indexPosition = sstable.getIndexScanPosition(startWith);
+ // -1 means the key is before everything in the sstable. So just start from the beginning.
+ if (indexPosition == -1)
+ indexPosition = 0;
+ ifile.seek(indexPosition);
- public void seekTo(RowPosition seekKey)
- {
try
{
- long indexPosition = sstable.getIndexScanPosition(seekKey);
- // -1 means the key is before everything in the sstable. So just start from the beginning.
- if (indexPosition == -1)
- indexPosition = 0;
-
- ifile.seek(indexPosition);
-
while (!ifile.isEOF())
{
- long startPosition = ifile.getFilePointer();
+ indexPosition = ifile.getFilePointer();
DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
- int comparison = indexDecoratedKey.compareTo(seekKey);
+ int comparison = indexDecoratedKey.compareTo(startWith);
if (comparison >= 0)
{
// Found, just read the dataPosition and seek into index and data files
long dataPosition = ifile.readLong();
- ifile.seek(startPosition);
+ ifile.seek(indexPosition);
dfile.seek(dataPosition);
- row = null;
- return;
+ break;
}
else
{
RowIndexEntry.serializer.skip(ifile);
}
}
- exhausted = true;
}
catch (IOException e)
{
sstable.markSuspect();
- throw new CorruptSSTableException(e, ifile.getPath());
+ throw new CorruptSSTableException(e, sstable.getFilename());
}
+
+ }
+
+ public SSTableScanner(SSTableReader sstable, QueryFilter filter, Range<Token> range, RateLimiter limiter)
+ {
+ this(sstable, filter, range.toRowBounds().left, limiter);
+
+ if (range.isWrapAround())
+ {
+ stopAt = dfile.length();
+ }
+ else
+ {
+ RowIndexEntry position = sstable.getPosition(range.toRowBounds().right, SSTableReader.Operator.GT);
+ stopAt = position == null ? dfile.length() : position.position;
+ }
+ }
+
+ public void close() throws IOException
+ {
+ FileUtils.close(dfile, ifile);
}
public long getLengthInBytes()
@@ -131,14 +140,14 @@ public class SSTableScanner implements ICompactionScanner
public boolean hasNext()
{
if (iterator == null)
- iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : createIterator();
+ iterator = createIterator();
return iterator.hasNext();
}
public OnDiskAtomIterator next()
{
if (iterator == null)
- iterator = exhausted ? Arrays.asList(new OnDiskAtomIterator[0]).iterator() : createIterator();
+ iterator = createIterator();
return iterator.next();
}
@@ -149,76 +158,24 @@ public class SSTableScanner implements ICompactionScanner
private Iterator<OnDiskAtomIterator> createIterator()
{
- return filter == null ? new KeyScanningIterator() : new FilteredKeyScanningIterator();
- }
-
- protected class KeyScanningIterator implements Iterator<OnDiskAtomIterator>
- {
- protected long finishedAt;
-
- public boolean hasNext()
- {
- if (row == null)
- return !dfile.isEOF();
- return finishedAt < dfile.length();
- }
-
- public OnDiskAtomIterator next()
- {
- try
- {
- if (row != null)
- dfile.seek(finishedAt);
- assert !dfile.isEOF();
-
- // Read data header
- DecoratedKey key = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dfile));
- long dataSize = dfile.readLong();
- long dataStart = dfile.getFilePointer();
- finishedAt = dataStart + dataSize;
-
- row = new SSTableIdentityIterator(sstable, dfile, key, dataStart, dataSize);
- return row;
- }
- catch (IOException e)
- {
- sstable.markSuspect();
- throw new CorruptSSTableException(e, dfile.getPath());
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String toString()
- {
- return getClass().getSimpleName() + "(" + "finishedAt:" + finishedAt + ")";
- }
+ return new KeyScanningIterator();
}
- protected class FilteredKeyScanningIterator implements Iterator<OnDiskAtomIterator>
+ protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator>
{
- protected DecoratedKey nextKey;
- protected RowIndexEntry nextEntry;
-
- public boolean hasNext()
- {
- if (row == null)
- return !ifile.isEOF();
- return nextKey != null;
- }
+ private DecoratedKey nextKey;
+ private RowIndexEntry nextEntry;
+ private DecoratedKey currentKey;
+ private RowIndexEntry currentEntry;
- public OnDiskAtomIterator next()
+ protected OnDiskAtomIterator computeNext()
{
try
{
- final DecoratedKey currentKey;
- final RowIndexEntry currentEntry;
+ if (ifile.isEOF() && nextKey == null)
+ return endOfData();
- if (row == null)
+ if (currentKey == null)
{
currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
currentEntry = RowIndexEntry.serializer.deserialize(ifile, sstable.descriptor.version);
@@ -229,6 +186,10 @@ public class SSTableScanner implements ICompactionScanner
currentEntry = nextEntry;
}
+ assert currentEntry.position <= stopAt;
+ if (currentEntry.position == stopAt)
+ return endOfData();
+
if (ifile.isEOF())
{
nextKey = null;
@@ -241,7 +202,18 @@ public class SSTableScanner implements ICompactionScanner
}
assert !dfile.isEOF();
- return row = new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
+
+ if (filter == null)
+ {
+ dfile.seek(currentEntry.position);
+ ByteBufferUtil.readWithShortLength(dfile); // key
+ if (sstable.descriptor.version.hasRowSizeAndColumnCount)
+ dfile.readLong();
+ long dataSize = (nextEntry == null ? dfile.length() : nextEntry.position) - dfile.getFilePointer();
+ return new SSTableIdentityIterator(sstable, dfile, currentKey, dataSize);
+ }
+
+ return new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
{
public OnDiskAtomIterator create()
{
@@ -252,14 +224,9 @@ public class SSTableScanner implements ICompactionScanner
catch (IOException e)
{
sstable.markSuspect();
- throw new CorruptSSTableException(e, ifile.getPath());
+ throw new CorruptSSTableException(e, sstable.getFilename());
}
}
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
}
@Override
@@ -269,7 +236,6 @@ public class SSTableScanner implements ICompactionScanner
"dfile=" + dfile +
" ifile=" + ifile +
" sstable=" + sstable +
- " exhausted=" + exhausted +
")";
}
}