You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/31 14:51:14 UTC
svn commit: r981046 - in /cassandra/trunk:
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/tools/
test/unit/org/apache/cassandra/io/sstable/ test/unit/org/a...
Author: jbellis
Date: Sat Jul 31 12:51:14 2010
New Revision: 981046
URL: http://svn.apache.org/viewvc?rev=981046&view=rev
Log:
rebuild secondary indexes after streaming. patch by Nate McCall and jbellis for CASSANDRA-1258
Added:
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Sat Jul 31 12:51:14 2010
@@ -27,6 +27,7 @@ import java.net.URL;
import java.net.UnknownHostException;
import java.util.*;
+import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1130,4 +1131,9 @@ public class DatabaseDescriptor
{
return getCFMetaData(keyspace, cf).getValueValidator(column);
}
+
+ public static CFMetaData getCFMetaData(Descriptor desc)
+ {
+ return getCFMetaData(desc.ksname, desc.cfname);
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Sat Jul 31 12:51:14 2010
@@ -122,7 +122,7 @@ public class BinaryMemtable implements I
{
logger.info("Writing " + this);
String path = cfs.getFlushPath();
- SSTableWriter writer = new SSTableWriter(path, sortedKeys.size(), StorageService.getPartitioner());
+ SSTableWriter writer = new SSTableWriter(path, sortedKeys.size(), cfs.metadata, cfs.partitioner_);
for (DecoratedKey key : sortedKeys)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Jul 31 12:51:14 2010
@@ -26,12 +26,10 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
@@ -62,7 +60,6 @@ import org.apache.cassandra.thrift.Index
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LatencyTracker;
-import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.WrappedRunnable;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
@@ -112,7 +109,7 @@ public class ColumnFamilyStore implement
public final String table_;
public final String columnFamily_;
- private final IPartitioner partitioner_;
+ public final IPartitioner partitioner_;
private volatile int memtableSwitchCount = 0;
@@ -206,7 +203,7 @@ public class ColumnFamilyStore implement
SSTableReader sstable;
try
{
- sstable = SSTableReader.open(filename, partitioner_);
+ sstable = SSTableReader.open(Descriptor.fromFilename(filename), metadata, partitioner_);
}
catch (IOException ex)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Sat Jul 31 12:51:14 2010
@@ -340,7 +340,7 @@ public class CompactionManager implement
}
String newFilename = new File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
- writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
+ writer = new SSTableWriter(newFilename, expectedBloomFilterSize, cfs.metadata, cfs.partitioner_);
while (nni.hasNext())
{
AbstractCompactedRow row = nni.next();
@@ -430,7 +430,7 @@ public class CompactionManager implement
{
FileUtils.createDirectory(compactionFileLocation);
String newFilename = new File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
- writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
+ writer = new SSTableWriter(newFilename, expectedBloomFilterSize, cfs.metadata, cfs.partitioner_);
}
writer.append(row);
totalkeysWritten++;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Sat Jul 31 12:51:14 2010
@@ -146,7 +146,7 @@ public class Memtable implements Compara
private SSTableReader writeSortedContents() throws IOException
{
logger.info("Writing " + this);
- SSTableWriter writer = new SSTableWriter(cfs.getFlushPath(), columnFamilies.size(), partitioner);
+ SSTableWriter writer = new SSTableWriter(cfs.getFlushPath(), columnFamilies.size(), cfs.metadata, partitioner);
for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
writer.append(entry.getKey(), entry.getValue());
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Sat Jul 31 12:51:14 2010
@@ -413,6 +413,26 @@ public class Table
entry.getKey().maybeSwitchMemtable(entry.getValue(), writeCommitLog);
}
+ public void applyIndexedCF(ColumnFamilyStore indexedCfs, DecoratedKey rowKey, DecoratedKey indexedKey, ColumnFamily indexedColumnFamily)
+ {
+ Memtable memtableToFlush;
+ flusherLock.readLock().lock();
+ try
+ {
+ synchronized (indexLocks[Arrays.hashCode(rowKey.key) % indexLocks.length])
+ {
+ memtableToFlush = indexedCfs.apply(indexedKey, indexedColumnFamily);
+ }
+ }
+ finally
+ {
+ flusherLock.readLock().unlock();
+ }
+
+ if (memtableToFlush != null)
+ indexedCfs.maybeSwitchMemtable(memtableToFlush, false);
+ }
+
private static void applyCF(ColumnFamilyStore cfs, DecoratedKey key, ColumnFamily columnFamily, HashMap<ColumnFamilyStore, Memtable> memtablesToFlush)
{
Memtable memtableToFlush = cfs.apply(key, columnFamily);
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Sat Jul 31 12:51:14 2010
@@ -29,6 +29,7 @@ import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.db.StatisticsTable;
@@ -50,7 +51,6 @@ public abstract class SSTable
{
static final Logger logger = LoggerFactory.getLogger(SSTable.class);
- public static final int FILES_ON_DISK = 3; // data, index, and bloom filter
public static final String COMPONENT_DATA = "Data.db";
public static final String COMPONENT_INDEX = "Index.db";
public static final String COMPONENT_FILTER = "Filter.db";
@@ -58,6 +58,7 @@ public abstract class SSTable
public static final String COMPONENT_COMPACTED = "Compacted";
protected Descriptor desc;
+ protected final CFMetaData metadata;
protected IPartitioner partitioner;
public static final String TEMPFILE_MARKER = "tmp";
@@ -66,16 +67,15 @@ public abstract class SSTable
protected EstimatedHistogram estimatedRowSize = new EstimatedHistogram(130);
protected EstimatedHistogram estimatedColumnCount = new EstimatedHistogram(112);
- protected SSTable(String filename, IPartitioner partitioner)
+ protected SSTable(String filename, CFMetaData metadata, IPartitioner partitioner)
{
- assert filename.endsWith("-" + COMPONENT_DATA);
- this.desc = Descriptor.fromFilename(filename);
- this.partitioner = partitioner;
+ this(Descriptor.fromFilename(filename), metadata, partitioner);
}
- protected SSTable(Descriptor desc, IPartitioner partitioner)
+ protected SSTable(Descriptor desc, CFMetaData metadata, IPartitioner partitioner)
{
this.desc = desc;
+ this.metadata = metadata;
this.partitioner = partitioner;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Sat Jul 31 12:51:14 2010
@@ -20,34 +20,33 @@
package org.apache.cassandra.io.sstable;
import java.io.*;
-import java.util.*;
-import java.lang.ref.ReferenceQueue;
import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.*;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
-import org.apache.cassandra.db.StatisticsTable;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.SegmentedFile;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.EstimatedHistogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.InstrumentedCache;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.clock.AbstractReconciler;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
/**
* SSTableReaders are open()ed by Table.onStart; after that they are created by SSTableWriter.renameAndOpen.
@@ -158,23 +157,12 @@ public class SSTableReader extends SSTab
}
}
- public static SSTableReader open(String dataFileName) throws IOException
- {
- return open(Descriptor.fromFilename(dataFileName));
- }
-
public static SSTableReader open(Descriptor desc) throws IOException
{
- return open(desc, StorageService.getPartitioner());
+ return open(desc, DatabaseDescriptor.getCFMetaData(desc.ksname, desc.cfname), StorageService.getPartitioner());
}
- /** public, but only for tests */
- public static SSTableReader open(String dataFileName, IPartitioner partitioner) throws IOException
- {
- return open(Descriptor.fromFilename(dataFileName), partitioner);
- }
-
- public static SSTableReader open(Descriptor descriptor, IPartitioner partitioner) throws IOException
+ public static SSTableReader open(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner) throws IOException
{
assert partitioner != null;
@@ -185,7 +173,7 @@ public class SSTableReader extends SSTab
// FIXME: version conditional readers here
if (true)
{
- sstable = internalOpen(descriptor, partitioner);
+ sstable = internalOpen(descriptor, metadata, partitioner);
}
if (logger.isDebugEnabled())
@@ -195,9 +183,9 @@ public class SSTableReader extends SSTab
}
/** Open a RowIndexedReader which needs its state loaded from disk. */
- static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner) throws IOException
+ static SSTableReader internalOpen(Descriptor desc, CFMetaData metadata, IPartitioner partitioner) throws IOException
{
- SSTableReader sstable = new SSTableReader(desc, partitioner, null, null, null, null, System.currentTimeMillis());
+ SSTableReader sstable = new SSTableReader(desc, metadata, partitioner, null, null, null, null, System.currentTimeMillis());
// versions before 'c' encoded keys as utf-16 before hashing to the filter
if (desc.hasStringsInBloomFilter)
@@ -214,16 +202,17 @@ public class SSTableReader extends SSTab
return sstable;
}
- SSTableReader(Descriptor desc,
- IPartitioner partitioner,
- SegmentedFile ifile,
- SegmentedFile dfile,
- IndexSummary indexSummary,
- BloomFilter bloomFilter,
- long maxDataAge)
+ private SSTableReader(Descriptor desc,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ SegmentedFile ifile,
+ SegmentedFile dfile,
+ IndexSummary indexSummary,
+ BloomFilter bloomFilter,
+ long maxDataAge)
throws IOException
{
- super(desc, partitioner);
+ super(desc, metadata, partitioner);
this.maxDataAge = maxDataAge;
@@ -236,25 +225,26 @@ public class SSTableReader extends SSTab
/**
* Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
*/
- static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, BloomFilter bf, long maxDataAge, EstimatedHistogram rowsize,
+ static SSTableReader internalOpen(Descriptor desc, CFMetaData metadata, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, BloomFilter bf, long maxDataAge, EstimatedHistogram rowsize,
EstimatedHistogram columncount) throws IOException
{
assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null;
- return new SSTableReader(desc, partitioner, ifile, dfile, isummary, bf, maxDataAge, rowsize, columncount);
+ return new SSTableReader(desc, metadata, partitioner, ifile, dfile, isummary, bf, maxDataAge, rowsize, columncount);
}
SSTableReader(Descriptor desc,
- IPartitioner partitioner,
- SegmentedFile ifile,
- SegmentedFile dfile,
- IndexSummary indexSummary,
- BloomFilter bloomFilter,
- long maxDataAge,
- EstimatedHistogram rowsize,
- EstimatedHistogram columncount)
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ SegmentedFile ifile,
+ SegmentedFile dfile,
+ IndexSummary indexSummary,
+ BloomFilter bloomFilter,
+ long maxDataAge,
+ EstimatedHistogram rowsize,
+ EstimatedHistogram columncount)
throws IOException
{
- super(desc, partitioner);
+ super(desc, metadata, partitioner);
this.maxDataAge = maxDataAge;
@@ -563,22 +553,19 @@ public class SSTableReader extends SSTab
public AbstractType getColumnComparator()
{
- return DatabaseDescriptor.getComparator(getTableName(), getColumnFamilyName());
+ return metadata.comparator;
}
public ColumnFamily makeColumnFamily()
{
- return ColumnFamily.create(getTableName(), getColumnFamilyName());
+ return ColumnFamily.create(metadata);
}
public ICompactSerializer2<IColumn> getColumnSerializer()
{
- ColumnFamilyType cfType = DatabaseDescriptor.getColumnFamilyType(getTableName(), getColumnFamilyName());
- ClockType clockType = DatabaseDescriptor.getClockType(getTableName(), getColumnFamilyName());
- AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(getTableName(), getColumnFamilyName());
- return cfType == ColumnFamilyType.Standard
- ? Column.serializer(clockType)
- : SuperColumn.serializer(getColumnComparator(), clockType, reconciler);
+ return metadata.cfType == ColumnFamilyType.Standard
+ ? Column.serializer(metadata.clockType)
+ : SuperColumn.serializer(getColumnComparator(), metadata.clockType, metadata.reconciler);
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Sat Jul 31 12:51:14 2010
@@ -20,15 +20,16 @@
package org.apache.cassandra.io.sstable;
import java.io.*;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.StatisticsTable;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.AbstractCompactedRow;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
@@ -47,9 +48,14 @@ public class SSTableWriter extends SSTab
private final BufferedRandomAccessFile dataFile;
private DecoratedKey lastWrittenKey;
- public SSTableWriter(String filename, long keyCount, IPartitioner partitioner) throws IOException
+ public SSTableWriter(String filename, long keyCount) throws IOException
{
- super(filename, partitioner);
+ this(filename, keyCount, DatabaseDescriptor.getCFMetaData(Descriptor.fromFilename(filename)), StorageService.getPartitioner());
+ }
+
+ public SSTableWriter(String filename, long keyCount, CFMetaData metadata, IPartitioner partitioner) throws IOException
+ {
+ super(filename, metadata, partitioner);
iwriter = new IndexWriter(desc, partitioner, keyCount);
dbuilder = SegmentedFile.getBuilder();
dataFile = new BufferedRandomAccessFile(getFilename(), "rw", DatabaseDescriptor.getInMemoryCompactionLimit());
@@ -149,7 +155,7 @@ public class SSTableWriter extends SSTab
// finalize in-memory state for the reader
SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
- SSTableReader sstable = SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, maxDataAge, estimatedRowSize, estimatedColumnCount);
+ SSTableReader sstable = SSTableReader.internalOpen(newdesc, metadata, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, maxDataAge, estimatedRowSize, estimatedColumnCount);
iwriter = null;
dbuilder = null;
return sstable;
@@ -202,12 +208,15 @@ public class SSTableWriter extends SSTab
*/
private static void maybeRecover(Descriptor desc) throws IOException
{
+ logger.debug("In maybeRecover with Descriptor {}", desc);
File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
if (ifile.exists() && ffile.exists())
// nothing to do
return;
+ ColumnFamilyStore cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
+ Set<byte[]> indexedColumns = cfs.getIndexedColumns();
// remove existing files
ifile.delete();
ffile.delete();
@@ -217,8 +226,8 @@ public class SSTableWriter extends SSTab
IndexWriter iwriter;
long estimatedRows;
try
- {
- estimatedRows = estimateRows(desc, dfile);
+ {
+ estimatedRows = estimateRows(desc, dfile);
iwriter = new IndexWriter(desc, StorageService.getPartitioner(), estimatedRows);
}
catch(IOException e)
@@ -237,11 +246,54 @@ public class SSTableWriter extends SSTab
{
key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, FBUtilities.readShortByteArray(dfile));
long dataSize = SSTableReader.readRowSize(dfile, desc);
+ if (!indexedColumns.isEmpty())
+ {
+ // skip bloom filter and column index
+ dfile.readFully(new byte[dfile.readInt()]);
+ dfile.readFully(new byte[dfile.readInt()]);
+
+ // index the column data
+ ColumnFamily cf = ColumnFamily.create(desc.ksname, desc.cfname);
+ ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, dfile);
+ int columns = dfile.readInt();
+ for (int i = 0; i < columns; i++)
+ {
+ IColumn iColumn = cf.getColumnSerializer().deserialize(dfile);
+ if (indexedColumns.contains(iColumn.name()))
+ {
+ DecoratedKey valueKey = cfs.getIndexKeyFor(iColumn.name(), iColumn.value());
+ ColumnFamily indexedCf = cfs.newIndexedColumnFamily(iColumn.name());
+ indexedCf.addColumn(new Column(key.key, ArrayUtils.EMPTY_BYTE_ARRAY, iColumn.clock()));
+ logger.debug("adding indexed column row mutation for key {}", valueKey);
+ Table.open(desc.ksname).applyIndexedCF(cfs.getIndexedColumnFamilyStore(iColumn.name()),
+ key,
+ valueKey,
+ indexedCf);
+ }
+ }
+ }
+
iwriter.afterAppend(key, dataPosition);
dataPosition = dfile.getFilePointer() + dataSize;
dfile.seek(dataPosition);
rows++;
}
+
+ for (byte[] column : cfs.getIndexedColumns())
+ {
+ try
+ {
+ cfs.getIndexedColumnFamilyStore(column).forceBlockingFlush();
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
}
finally
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Sat Jul 31 12:51:14 2010
@@ -197,7 +197,7 @@ public class SSTableExport
public static void export(String ssTableFile, PrintStream outs, String[] keys, String[] excludes)
throws IOException
{
- SSTableReader reader = SSTableReader.open(ssTableFile);
+ SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
SSTableScanner scanner = reader.getScanner(INPUT_FILE_BUFFER_SIZE);
IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
Set<String> excludeSet = new HashSet();
@@ -309,7 +309,7 @@ public class SSTableExport
*/
public static void export(String ssTableFile, PrintStream outs, String[] excludes) throws IOException
{
- SSTableReader reader = SSTableReader.open(ssTableFile);
+ SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
export(reader, outs, excludes);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java Sat Jul 31 12:51:14 2010
@@ -159,7 +159,7 @@ public class SSTableImport
{
JSONObject json = (JSONObject)JSONValue.parseWithException(new FileReader(jsonFile));
- SSTableWriter writer = new SSTableWriter(ssTablePath, json.size(), partitioner);
+ SSTableWriter writer = new SSTableWriter(ssTablePath, json.size());
SortedMap<DecoratedKey,String> decoratedKeys = new TreeMap<DecoratedKey,String>();
// sort by dk representation, but hold onto the hex version
Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java Sat Jul 31 12:51:14 2010
@@ -85,7 +85,7 @@ public class SSTableUtils
public static SSTableReader writeRawSSTable(String tablename, String cfname, Map<byte[], byte[]> entries) throws IOException
{
File datafile = tempSSTableFile(tablename, cfname);
- SSTableWriter writer = new SSTableWriter(datafile.getAbsolutePath(), entries.size(), StorageService.getPartitioner());
+ SSTableWriter writer = new SSTableWriter(datafile.getAbsolutePath(), entries.size());
SortedMap<DecoratedKey, byte[]> sortedEntries = new TreeMap<DecoratedKey, byte[]>();
for (Map.Entry<byte[], byte[]> entry : entries.entrySet())
sortedEntries.put(writer.partitioner.decorateKey(entry.getKey()), entry.getValue());
Added: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java?rev=981046&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java Sat Jul 31 12:51:14 2010
@@ -0,0 +1,75 @@
+package org.apache.cassandra.io.sstable;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.TimestampClock;
+import org.apache.cassandra.db.filter.IFilter;
+import org.apache.cassandra.db.filter.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.Test;
+
+public class SSTableWriterTest extends CleanupHelper {
+
+ @Test
+ public void testRecoverAndOpen() throws IOException
+ {
+ RowMutation rm;
+
+ rm = new RowMutation("Keyspace1", "k1".getBytes());
+ rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(0));
+ rm.apply();
+
+ ColumnFamily cf = ColumnFamily.create("Keyspace1", "Indexed1");
+ cf.addColumn(new Column("birthdate".getBytes(), FBUtilities.toByteArray(1L), new TimestampClock(0)));
+ cf.addColumn(new Column("anydate".getBytes(), FBUtilities.toByteArray(1L), new TimestampClock(0)));
+
+ Map<byte[], byte[]> entries = new HashMap<byte[], byte[]>();
+
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ ColumnFamily.serializer().serializeWithIndexes(cf, buffer);
+ entries.put("k2".getBytes(), Arrays.copyOf(buffer.getData(), buffer.getLength()));
+ cf.clear();
+
+ cf.addColumn(new Column("anydate".getBytes(), FBUtilities.toByteArray(1L), new TimestampClock(0)));
+ buffer = new DataOutputBuffer();
+ ColumnFamily.serializer().serializeWithIndexes(cf, buffer);
+ entries.put("k3".getBytes(), Arrays.copyOf(buffer.getData(), buffer.getLength()));
+
+ SSTableReader orig = SSTableUtils.writeRawSSTable("Keyspace1", "Indexed1", entries);
+ // whack the index to trigger the recover
+ new File(orig.indexFilename()).delete();
+ new File(orig.filterFilename()).delete();
+
+ SSTableReader sstr = SSTableWriter.recoverAndOpen(orig.desc);
+
+ ColumnFamilyStore cfs = Table.open("Keyspace1").getColumnFamilyStore("Indexed1");
+ cfs.addSSTable(sstr);
+
+ IndexExpression expr = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, FBUtilities.toByteArray(1L));
+ IndexClause clause = new IndexClause(Arrays.asList(expr), 100);
+ IFilter filter = new IdentityQueryFilter();
+ List<Row> rows = cfs.scan(clause, filter);
+
+ assertEquals("IndexExpression should return two rows on recoverAndOpen",2, rows.size());
+ assertTrue("First result should be 'k1'",Arrays.equals("k1".getBytes(), rows.get(0).key.key));
+ }
+}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java Sat Jul 31 12:51:14 2010
@@ -31,9 +31,10 @@ import org.apache.cassandra.db.Timestamp
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.io.util.DataOutputBuffer;
+
import static org.apache.cassandra.io.sstable.SSTableUtils.tempSSTableFile;
import static org.apache.cassandra.utils.FBUtilities.bytesToHex;
import static org.apache.cassandra.utils.FBUtilities.hexToBytes;
@@ -59,8 +60,7 @@ public class SSTableExportTest extends S
{
File tempSS = tempSSTableFile("Keyspace1", "Standard1");
ColumnFamily cfamily = ColumnFamily.create("Keyspace1", "Standard1");
- IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
- SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, partitioner);
+ SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
// Add rowA
cfamily.addColumn(new QueryPath("Standard1", null, "colA".getBytes()), "valA".getBytes(), new TimestampClock(1));
@@ -92,8 +92,7 @@ public class SSTableExportTest extends S
public void testExportSimpleCf() throws IOException {
File tempSS = tempSSTableFile("Keyspace1", "Standard1");
ColumnFamily cfamily = ColumnFamily.create("Keyspace1", "Standard1");
- IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
- SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, partitioner);
+ SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
// Add rowA
cfamily.addColumn(new QueryPath("Standard1", null, "colA".getBytes()), "valA".getBytes(), new TimestampClock(1));
@@ -135,8 +134,7 @@ public class SSTableExportTest extends S
{
File tempSS = tempSSTableFile("Keyspace1", "Super4");
ColumnFamily cfamily = ColumnFamily.create("Keyspace1", "Super4");
- IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
- SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, partitioner);
+ SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
// Add rowA
cfamily.addColumn(new QueryPath("Super4", "superA".getBytes(), "colA".getBytes()), "valA".getBytes(), new TimestampClock(1));
@@ -176,8 +174,7 @@ public class SSTableExportTest extends S
{
File tempSS = tempSSTableFile("Keyspace1", "Standard1");
ColumnFamily cfamily = ColumnFamily.create("Keyspace1", "Standard1");
- IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
- SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, partitioner);
+ SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
// Add rowA
cfamily.addColumn(new QueryPath("Standard1", null, "name".getBytes()), "val".getBytes(), new TimestampClock(1));
@@ -197,9 +194,9 @@ public class SSTableExportTest extends S
// Import JSON to another SSTable file
File tempSS2 = tempSSTableFile("Keyspace1", "Standard1");
- SSTableImport.importJson(tempJson.getPath(), "Keyspace1", "Standard1", tempSS2.getPath());
-
- reader = SSTableReader.open(tempSS2.getPath(), DatabaseDescriptor.getPartitioner());
+ SSTableImport.importJson(tempJson.getPath(), "Keyspace1", "Standard1", tempSS2.getPath());
+
+ reader = SSTableReader.open(Descriptor.fromFilename(tempSS2.getPath()));
QueryFilter qf = QueryFilter.getNamesFilter(Util.dk("rowA"), new QueryPath("Standard1", null, null), "name".getBytes());
ColumnFamily cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
assertTrue(cf != null);
Modified: cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java Sat Jul 31 12:51:14 2010
@@ -26,9 +26,9 @@ import org.apache.cassandra.SchemaLoader
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import static org.apache.cassandra.utils.FBUtilities.hexToBytes;
import static org.apache.cassandra.io.sstable.SSTableUtils.tempSSTableFile;
@@ -49,7 +49,7 @@ public class SSTableImportTest extends S
SSTableImport.importJson(jsonUrl, "Keyspace1", "Standard1", tempSS.getPath());
// Verify results
- SSTableReader reader = SSTableReader.open(tempSS.getPath(), DatabaseDescriptor.getPartitioner());
+ SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
QueryFilter qf = QueryFilter.getNamesFilter(Util.dk("rowA"), new QueryPath("Standard1", null, null), "colAA".getBytes());
ColumnFamily cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
assert Arrays.equals(cf.getColumn("colAA".getBytes()).value(), hexToBytes("76616c4141"));
@@ -63,7 +63,7 @@ public class SSTableImportTest extends S
SSTableImport.importJson(jsonUrl, "Keyspace1", "Super4", tempSS.getPath());
// Verify results
- SSTableReader reader = SSTableReader.open(tempSS.getPath(), DatabaseDescriptor.getPartitioner());
+ SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
QueryFilter qf = QueryFilter.getNamesFilter(Util.dk("rowA"), new QueryPath("Super4", null, null), "superA".getBytes());
ColumnFamily cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
IColumn superCol = cf.getColumn("superA".getBytes());