You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/31 14:51:14 UTC

svn commit: r981046 - in /cassandra/trunk: src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/tools/ test/unit/org/apache/cassandra/io/sstable/ test/unit/org/a...

Author: jbellis
Date: Sat Jul 31 12:51:14 2010
New Revision: 981046

URL: http://svn.apache.org/viewvc?rev=981046&view=rev
Log:
rebuild secondary indexes after streaming.  patch by Nate McCall and jbellis for CASSANDRA-1258

Added:
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
    cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Sat Jul 31 12:51:14 2010
@@ -27,6 +27,7 @@ import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.*;
 
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1130,4 +1131,9 @@ public class DatabaseDescriptor
     {
         return getCFMetaData(keyspace, cf).getValueValidator(column);
     }
+
+    public static CFMetaData getCFMetaData(Descriptor desc)
+    {
+        return getCFMetaData(desc.ksname, desc.cfname);
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Sat Jul 31 12:51:14 2010
@@ -122,7 +122,7 @@ public class BinaryMemtable implements I
     {
         logger.info("Writing " + this);
         String path = cfs.getFlushPath();
-        SSTableWriter writer = new SSTableWriter(path, sortedKeys.size(), StorageService.getPartitioner());
+        SSTableWriter writer = new SSTableWriter(path, sortedKeys.size(), cfs.metadata, cfs.partitioner_);
 
         for (DecoratedKey key : sortedKeys)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Jul 31 12:51:14 2010
@@ -26,12 +26,10 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
@@ -62,7 +60,6 @@ import org.apache.cassandra.thrift.Index
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.LatencyTracker;
-import org.apache.cassandra.utils.SimpleCondition;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
@@ -112,7 +109,7 @@ public class ColumnFamilyStore implement
 
     public final String table_;
     public final String columnFamily_;
-    private final IPartitioner partitioner_;
+    public final IPartitioner partitioner_;
 
     private volatile int memtableSwitchCount = 0;
 
@@ -206,7 +203,7 @@ public class ColumnFamilyStore implement
             SSTableReader sstable;
             try
             {
-                sstable = SSTableReader.open(filename, partitioner_);
+                sstable = SSTableReader.open(Descriptor.fromFilename(filename), metadata, partitioner_);
             }
             catch (IOException ex)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Sat Jul 31 12:51:14 2010
@@ -340,7 +340,7 @@ public class CompactionManager implement
             }
 
             String newFilename = new File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
-            writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
+            writer = new SSTableWriter(newFilename, expectedBloomFilterSize, cfs.metadata, cfs.partitioner_);
             while (nni.hasNext())
             {
                 AbstractCompactedRow row = nni.next();
@@ -430,7 +430,7 @@ public class CompactionManager implement
                 {
                     FileUtils.createDirectory(compactionFileLocation);
                     String newFilename = new File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
-                    writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
+                    writer = new SSTableWriter(newFilename, expectedBloomFilterSize, cfs.metadata, cfs.partitioner_);
                 }
                 writer.append(row);
                 totalkeysWritten++;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Sat Jul 31 12:51:14 2010
@@ -146,7 +146,7 @@ public class Memtable implements Compara
     private SSTableReader writeSortedContents() throws IOException
     {
         logger.info("Writing " + this);
-        SSTableWriter writer = new SSTableWriter(cfs.getFlushPath(), columnFamilies.size(), partitioner);
+        SSTableWriter writer = new SSTableWriter(cfs.getFlushPath(), columnFamilies.size(), cfs.metadata, partitioner);
 
         for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
             writer.append(entry.getKey(), entry.getValue());

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Sat Jul 31 12:51:14 2010
@@ -413,6 +413,26 @@ public class Table
             entry.getKey().maybeSwitchMemtable(entry.getValue(), writeCommitLog);
     }
 
+    public void applyIndexedCF(ColumnFamilyStore indexedCfs, DecoratedKey rowKey, DecoratedKey indexedKey, ColumnFamily indexedColumnFamily) 
+    {
+        Memtable memtableToFlush;
+        flusherLock.readLock().lock();
+        try
+        {
+            synchronized (indexLocks[Arrays.hashCode(rowKey.key) % indexLocks.length])
+            {
+                memtableToFlush = indexedCfs.apply(indexedKey, indexedColumnFamily);
+            }
+        }
+        finally 
+        {
+            flusherLock.readLock().unlock();
+        }
+
+        if (memtableToFlush != null)
+            indexedCfs.maybeSwitchMemtable(memtableToFlush, false);
+    }
+    
     private static void applyCF(ColumnFamilyStore cfs, DecoratedKey key, ColumnFamily columnFamily, HashMap<ColumnFamilyStore, Memtable> memtablesToFlush)
     {
         Memtable memtableToFlush = cfs.apply(key, columnFamily);

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Sat Jul 31 12:51:14 2010
@@ -29,6 +29,7 @@ import java.util.Arrays;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.db.StatisticsTable;
@@ -50,7 +51,6 @@ public abstract class SSTable
 {
     static final Logger logger = LoggerFactory.getLogger(SSTable.class);
 
-    public static final int FILES_ON_DISK = 3; // data, index, and bloom filter
     public static final String COMPONENT_DATA = "Data.db";
     public static final String COMPONENT_INDEX = "Index.db";
     public static final String COMPONENT_FILTER = "Filter.db";
@@ -58,6 +58,7 @@ public abstract class SSTable
     public static final String COMPONENT_COMPACTED = "Compacted";
 
     protected Descriptor desc;
+    protected final CFMetaData metadata;
     protected IPartitioner partitioner;
 
     public static final String TEMPFILE_MARKER = "tmp";
@@ -66,16 +67,15 @@ public abstract class SSTable
     protected EstimatedHistogram estimatedRowSize = new EstimatedHistogram(130);
     protected EstimatedHistogram estimatedColumnCount = new EstimatedHistogram(112);
 
-    protected SSTable(String filename, IPartitioner partitioner)
+    protected SSTable(String filename, CFMetaData metadata, IPartitioner partitioner)
     {
-        assert filename.endsWith("-" + COMPONENT_DATA);
-        this.desc = Descriptor.fromFilename(filename);
-        this.partitioner = partitioner;
+        this(Descriptor.fromFilename(filename), metadata, partitioner);
     }
 
-    protected SSTable(Descriptor desc, IPartitioner partitioner)
+    protected SSTable(Descriptor desc, CFMetaData metadata, IPartitioner partitioner)
     {
         this.desc = desc;
+        this.metadata = metadata;
         this.partitioner = partitioner;
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Sat Jul 31 12:51:14 2010
@@ -20,34 +20,33 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.*;
-import java.util.*;
-import java.lang.ref.ReferenceQueue;
 import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.*;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
-import org.apache.cassandra.db.StatisticsTable;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.io.util.SegmentedFile;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.EstimatedHistogram;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.InstrumentedCache;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * SSTableReaders are open()ed by Table.onStart; after that they are created by SSTableWriter.renameAndOpen.
@@ -158,23 +157,12 @@ public class SSTableReader extends SSTab
         }
     }
 
-    public static SSTableReader open(String dataFileName) throws IOException
-    {
-        return open(Descriptor.fromFilename(dataFileName));
-    }
-
     public static SSTableReader open(Descriptor desc) throws IOException
     {
-        return open(desc, StorageService.getPartitioner());
+        return open(desc, DatabaseDescriptor.getCFMetaData(desc.ksname, desc.cfname), StorageService.getPartitioner());
     }
 
-    /** public, but only for tests */
-    public static SSTableReader open(String dataFileName, IPartitioner partitioner) throws IOException
-    {
-        return open(Descriptor.fromFilename(dataFileName), partitioner);
-    }
-
-    public static SSTableReader open(Descriptor descriptor, IPartitioner partitioner) throws IOException
+    public static SSTableReader open(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner) throws IOException
     {
         assert partitioner != null;
 
@@ -185,7 +173,7 @@ public class SSTableReader extends SSTab
         // FIXME: version conditional readers here
         if (true)
         {
-            sstable = internalOpen(descriptor, partitioner);
+            sstable = internalOpen(descriptor, metadata, partitioner);
         }
 
         if (logger.isDebugEnabled())
@@ -195,9 +183,9 @@ public class SSTableReader extends SSTab
     }
 
     /** Open a RowIndexedReader which needs its state loaded from disk. */
-    static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner) throws IOException
+    static SSTableReader internalOpen(Descriptor desc, CFMetaData metadata, IPartitioner partitioner) throws IOException
     {
-        SSTableReader sstable = new SSTableReader(desc, partitioner, null, null, null, null, System.currentTimeMillis());
+        SSTableReader sstable = new SSTableReader(desc, metadata, partitioner, null, null, null, null, System.currentTimeMillis());
 
         // versions before 'c' encoded keys as utf-16 before hashing to the filter
         if (desc.hasStringsInBloomFilter)
@@ -214,16 +202,17 @@ public class SSTableReader extends SSTab
         return sstable;
     }
 
-    SSTableReader(Descriptor desc,
-                  IPartitioner partitioner,
-                  SegmentedFile ifile,
-                  SegmentedFile dfile,
-                  IndexSummary indexSummary,
-                  BloomFilter bloomFilter,
-                  long maxDataAge)
+    private SSTableReader(Descriptor desc,
+                          CFMetaData metadata,
+                          IPartitioner partitioner,
+                          SegmentedFile ifile,
+                          SegmentedFile dfile,
+                          IndexSummary indexSummary,
+                          BloomFilter bloomFilter,
+                          long maxDataAge)
             throws IOException
     {
-        super(desc, partitioner);
+        super(desc, metadata, partitioner);
         this.maxDataAge = maxDataAge;
 
 
@@ -236,25 +225,26 @@ public class SSTableReader extends SSTab
     /**
      * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
      */
-    static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, BloomFilter bf, long maxDataAge, EstimatedHistogram rowsize,
+    static SSTableReader internalOpen(Descriptor desc, CFMetaData metadata, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, BloomFilter bf, long maxDataAge, EstimatedHistogram rowsize,
                                       EstimatedHistogram columncount) throws IOException
     {
         assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null;
-        return new SSTableReader(desc, partitioner, ifile, dfile, isummary, bf, maxDataAge, rowsize, columncount);
+        return new SSTableReader(desc, metadata, partitioner, ifile, dfile, isummary, bf, maxDataAge, rowsize, columncount);
     }
 
     SSTableReader(Descriptor desc,
-                     IPartitioner partitioner,
-                     SegmentedFile ifile,
-                     SegmentedFile dfile,
-                     IndexSummary indexSummary,
-                     BloomFilter bloomFilter,
-                     long maxDataAge,
-                     EstimatedHistogram rowsize,
-                     EstimatedHistogram columncount)
+                  CFMetaData metadata,
+                  IPartitioner partitioner,
+                  SegmentedFile ifile,
+                  SegmentedFile dfile,
+                  IndexSummary indexSummary,
+                  BloomFilter bloomFilter,
+                  long maxDataAge,
+                  EstimatedHistogram rowsize,
+                  EstimatedHistogram columncount)
     throws IOException
     {
-        super(desc, partitioner);
+        super(desc, metadata, partitioner);
         this.maxDataAge = maxDataAge;
 
 
@@ -563,22 +553,19 @@ public class SSTableReader extends SSTab
 
     public AbstractType getColumnComparator()
     {
-        return DatabaseDescriptor.getComparator(getTableName(), getColumnFamilyName());
+        return metadata.comparator;
     }
 
     public ColumnFamily makeColumnFamily()
     {
-        return ColumnFamily.create(getTableName(), getColumnFamilyName());
+        return ColumnFamily.create(metadata);
     }
 
     public ICompactSerializer2<IColumn> getColumnSerializer()
     {
-        ColumnFamilyType cfType = DatabaseDescriptor.getColumnFamilyType(getTableName(), getColumnFamilyName());
-        ClockType clockType = DatabaseDescriptor.getClockType(getTableName(), getColumnFamilyName());
-        AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(getTableName(), getColumnFamilyName());
-        return cfType == ColumnFamilyType.Standard
-               ? Column.serializer(clockType)
-               : SuperColumn.serializer(getColumnComparator(), clockType, reconciler);
+        return metadata.cfType == ColumnFamilyType.Standard
+               ? Column.serializer(metadata.clockType)
+               : SuperColumn.serializer(getColumnComparator(), metadata.clockType, metadata.reconciler);
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Sat Jul 31 12:51:14 2010
@@ -20,15 +20,16 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.*;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.StatisticsTable;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.AbstractCompactedRow;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
@@ -47,9 +48,14 @@ public class SSTableWriter extends SSTab
     private final BufferedRandomAccessFile dataFile;
     private DecoratedKey lastWrittenKey;
 
-    public SSTableWriter(String filename, long keyCount, IPartitioner partitioner) throws IOException
+    public SSTableWriter(String filename, long keyCount) throws IOException
     {
-        super(filename, partitioner);
+        this(filename, keyCount, DatabaseDescriptor.getCFMetaData(Descriptor.fromFilename(filename)), StorageService.getPartitioner());
+    }
+
+    public SSTableWriter(String filename, long keyCount, CFMetaData metadata, IPartitioner partitioner) throws IOException
+    {
+        super(filename, metadata, partitioner);
         iwriter = new IndexWriter(desc, partitioner, keyCount);
         dbuilder = SegmentedFile.getBuilder();
         dataFile = new BufferedRandomAccessFile(getFilename(), "rw", DatabaseDescriptor.getInMemoryCompactionLimit());
@@ -149,7 +155,7 @@ public class SSTableWriter extends SSTab
         // finalize in-memory state for the reader
         SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
         SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
-        SSTableReader sstable = SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, maxDataAge, estimatedRowSize, estimatedColumnCount);
+        SSTableReader sstable = SSTableReader.internalOpen(newdesc, metadata, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, maxDataAge, estimatedRowSize, estimatedColumnCount);
         iwriter = null;
         dbuilder = null;
         return sstable;
@@ -202,12 +208,15 @@ public class SSTableWriter extends SSTab
      */
     private static void maybeRecover(Descriptor desc) throws IOException
     {
+        logger.debug("In maybeRecover with Descriptor {}", desc);
         File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
         File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
         if (ifile.exists() && ffile.exists())
             // nothing to do
             return;
 
+        ColumnFamilyStore cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
+        Set<byte[]> indexedColumns = cfs.getIndexedColumns();
         // remove existing files
         ifile.delete();
         ffile.delete();
@@ -217,8 +226,8 @@ public class SSTableWriter extends SSTab
         IndexWriter iwriter;
         long estimatedRows;
         try
-        {
-            estimatedRows = estimateRows(desc, dfile);
+        {            
+            estimatedRows = estimateRows(desc, dfile);            
             iwriter = new IndexWriter(desc, StorageService.getPartitioner(), estimatedRows);
         }
         catch(IOException e)
@@ -237,11 +246,54 @@ public class SSTableWriter extends SSTab
             {
                 key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, FBUtilities.readShortByteArray(dfile));
                 long dataSize = SSTableReader.readRowSize(dfile, desc);
+                if (!indexedColumns.isEmpty())
+                {
+                    // skip bloom filter and column index
+                    dfile.readFully(new byte[dfile.readInt()]);
+                    dfile.readFully(new byte[dfile.readInt()]);
+
+                    // index the column data
+                    ColumnFamily cf = ColumnFamily.create(desc.ksname, desc.cfname);
+                    ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, dfile);
+                    int columns = dfile.readInt();
+                    for (int i = 0; i < columns; i++)
+                    {
+                        IColumn iColumn = cf.getColumnSerializer().deserialize(dfile);
+                        if (indexedColumns.contains(iColumn.name()))
+                        {
+                            DecoratedKey valueKey = cfs.getIndexKeyFor(iColumn.name(), iColumn.value());
+                            ColumnFamily indexedCf = cfs.newIndexedColumnFamily(iColumn.name());
+                            indexedCf.addColumn(new Column(key.key, ArrayUtils.EMPTY_BYTE_ARRAY, iColumn.clock()));
+                            logger.debug("adding indexed column row mutation for key {}", valueKey);
+                            Table.open(desc.ksname).applyIndexedCF(cfs.getIndexedColumnFamilyStore(iColumn.name()),
+                                                                   key,
+                                                                   valueKey,
+                                                                   indexedCf);
+                        }
+                    }
+                }
+
                 iwriter.afterAppend(key, dataPosition);
                 dataPosition = dfile.getFilePointer() + dataSize;
                 dfile.seek(dataPosition);
                 rows++;
             }
+
+            for (byte[] column : cfs.getIndexedColumns())
+            {
+                try
+                {
+                    cfs.getIndexedColumnFamilyStore(column).forceBlockingFlush();
+                }
+                catch (ExecutionException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+            }
         }
         finally
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Sat Jul 31 12:51:14 2010
@@ -197,7 +197,7 @@ public class SSTableExport
     public static void export(String ssTableFile, PrintStream outs, String[] keys, String[] excludes)
     throws IOException
     {
-        SSTableReader reader = SSTableReader.open(ssTableFile);
+        SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
         SSTableScanner scanner = reader.getScanner(INPUT_FILE_BUFFER_SIZE);
         IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();    
         Set<String> excludeSet = new HashSet();
@@ -309,7 +309,7 @@ public class SSTableExport
      */
     public static void export(String ssTableFile, PrintStream outs, String[] excludes) throws IOException
     {
-        SSTableReader reader = SSTableReader.open(ssTableFile);
+        SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(ssTableFile));
         export(reader, outs, excludes);
     }
     

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java Sat Jul 31 12:51:14 2010
@@ -159,7 +159,7 @@ public class SSTableImport
         {
             JSONObject json = (JSONObject)JSONValue.parseWithException(new FileReader(jsonFile));
             
-            SSTableWriter writer = new SSTableWriter(ssTablePath, json.size(), partitioner);
+            SSTableWriter writer = new SSTableWriter(ssTablePath, json.size());
             SortedMap<DecoratedKey,String> decoratedKeys = new TreeMap<DecoratedKey,String>();
             
             // sort by dk representation, but hold onto the hex version

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java Sat Jul 31 12:51:14 2010
@@ -85,7 +85,7 @@ public class SSTableUtils
     public static SSTableReader writeRawSSTable(String tablename, String cfname, Map<byte[], byte[]> entries) throws IOException
     {
         File datafile = tempSSTableFile(tablename, cfname);
-        SSTableWriter writer = new SSTableWriter(datafile.getAbsolutePath(), entries.size(), StorageService.getPartitioner());
+        SSTableWriter writer = new SSTableWriter(datafile.getAbsolutePath(), entries.size());
         SortedMap<DecoratedKey, byte[]> sortedEntries = new TreeMap<DecoratedKey, byte[]>();
         for (Map.Entry<byte[], byte[]> entry : entries.entrySet())
             sortedEntries.put(writer.partitioner.decorateKey(entry.getKey()), entry.getValue());

Added: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java?rev=981046&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java Sat Jul 31 12:51:14 2010
@@ -0,0 +1,75 @@
+package org.apache.cassandra.io.sstable;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.TimestampClock;
+import org.apache.cassandra.db.filter.IFilter;
+import org.apache.cassandra.db.filter.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.Test;
+
+public class SSTableWriterTest extends CleanupHelper {
+
+    @Test
+    public void testRecoverAndOpen() throws IOException
+    {
+        RowMutation rm;
+
+        rm = new RowMutation("Keyspace1", "k1".getBytes());
+        rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(0));
+        rm.apply();
+        
+        ColumnFamily cf = ColumnFamily.create("Keyspace1", "Indexed1");        
+        cf.addColumn(new Column("birthdate".getBytes(), FBUtilities.toByteArray(1L), new TimestampClock(0)));
+        cf.addColumn(new Column("anydate".getBytes(), FBUtilities.toByteArray(1L), new TimestampClock(0)));
+        
+        Map<byte[], byte[]> entries = new HashMap<byte[], byte[]>();
+        
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        ColumnFamily.serializer().serializeWithIndexes(cf, buffer);
+        entries.put("k2".getBytes(), Arrays.copyOf(buffer.getData(), buffer.getLength()));        
+        cf.clear();
+        
+        cf.addColumn(new Column("anydate".getBytes(), FBUtilities.toByteArray(1L), new TimestampClock(0)));
+        buffer = new DataOutputBuffer();
+        ColumnFamily.serializer().serializeWithIndexes(cf, buffer);               
+        entries.put("k3".getBytes(), Arrays.copyOf(buffer.getData(), buffer.getLength()));
+        
+        SSTableReader orig = SSTableUtils.writeRawSSTable("Keyspace1", "Indexed1", entries);        
+        // whack the index to trigger the recover
+        new File(orig.indexFilename()).delete();
+        new File(orig.filterFilename()).delete();
+        
+        SSTableReader sstr = SSTableWriter.recoverAndOpen(orig.desc);
+        
+        ColumnFamilyStore cfs = Table.open("Keyspace1").getColumnFamilyStore("Indexed1");
+        cfs.addSSTable(sstr);
+        
+        IndexExpression expr = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, FBUtilities.toByteArray(1L));
+        IndexClause clause = new IndexClause(Arrays.asList(expr), 100);
+        IFilter filter = new IdentityQueryFilter();
+        List<Row> rows = cfs.scan(clause, filter);
+        
+        assertEquals("IndexExpression should return two rows on recoverAndOpen",2, rows.size());
+        assertTrue("First result should be 'k1'",Arrays.equals("k1".getBytes(), rows.get(0).key.key));
+    }
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java Sat Jul 31 12:51:14 2010
@@ -31,9 +31,10 @@ import org.apache.cassandra.db.Timestamp
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.io.util.DataOutputBuffer;
+
 import static org.apache.cassandra.io.sstable.SSTableUtils.tempSSTableFile;
 import static org.apache.cassandra.utils.FBUtilities.bytesToHex;
 import static org.apache.cassandra.utils.FBUtilities.hexToBytes;
@@ -59,8 +60,7 @@ public class SSTableExportTest extends S
     {
         File tempSS = tempSSTableFile("Keyspace1", "Standard1");
         ColumnFamily cfamily = ColumnFamily.create("Keyspace1", "Standard1");
-        IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, partitioner);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
         
         // Add rowA
         cfamily.addColumn(new QueryPath("Standard1", null, "colA".getBytes()), "valA".getBytes(), new TimestampClock(1));
@@ -92,8 +92,7 @@ public class SSTableExportTest extends S
     public void testExportSimpleCf() throws IOException    {
         File tempSS = tempSSTableFile("Keyspace1", "Standard1");
         ColumnFamily cfamily = ColumnFamily.create("Keyspace1", "Standard1");
-        IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, partitioner);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
         
         // Add rowA
         cfamily.addColumn(new QueryPath("Standard1", null, "colA".getBytes()), "valA".getBytes(), new TimestampClock(1));
@@ -135,8 +134,7 @@ public class SSTableExportTest extends S
     {
         File tempSS = tempSSTableFile("Keyspace1", "Super4");
         ColumnFamily cfamily = ColumnFamily.create("Keyspace1", "Super4");
-        IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, partitioner);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
         
         // Add rowA
         cfamily.addColumn(new QueryPath("Super4", "superA".getBytes(), "colA".getBytes()), "valA".getBytes(), new TimestampClock(1));
@@ -176,8 +174,7 @@ public class SSTableExportTest extends S
     {
         File tempSS = tempSSTableFile("Keyspace1", "Standard1");
         ColumnFamily cfamily = ColumnFamily.create("Keyspace1", "Standard1");
-        IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, partitioner);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
         
         // Add rowA
         cfamily.addColumn(new QueryPath("Standard1", null, "name".getBytes()), "val".getBytes(), new TimestampClock(1));
@@ -197,9 +194,9 @@ public class SSTableExportTest extends S
         
         // Import JSON to another SSTable file
         File tempSS2 = tempSSTableFile("Keyspace1", "Standard1");
-        SSTableImport.importJson(tempJson.getPath(), "Keyspace1", "Standard1", tempSS2.getPath());        
-        
-        reader = SSTableReader.open(tempSS2.getPath(), DatabaseDescriptor.getPartitioner());
+        SSTableImport.importJson(tempJson.getPath(), "Keyspace1", "Standard1", tempSS2.getPath());
+
+        reader = SSTableReader.open(Descriptor.fromFilename(tempSS2.getPath()));
         QueryFilter qf = QueryFilter.getNamesFilter(Util.dk("rowA"), new QueryPath("Standard1", null, null), "name".getBytes());
         ColumnFamily cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
         assertTrue(cf != null);

Modified: cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java?rev=981046&r1=981045&r2=981046&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java Sat Jul 31 12:51:14 2010
@@ -26,9 +26,9 @@ import org.apache.cassandra.SchemaLoader
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import static org.apache.cassandra.utils.FBUtilities.hexToBytes;
 import static org.apache.cassandra.io.sstable.SSTableUtils.tempSSTableFile;
@@ -49,7 +49,7 @@ public class SSTableImportTest extends S
         SSTableImport.importJson(jsonUrl, "Keyspace1", "Standard1", tempSS.getPath());
 
         // Verify results
-        SSTableReader reader = SSTableReader.open(tempSS.getPath(), DatabaseDescriptor.getPartitioner());
+        SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
         QueryFilter qf = QueryFilter.getNamesFilter(Util.dk("rowA"), new QueryPath("Standard1", null, null), "colAA".getBytes());
         ColumnFamily cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
         assert Arrays.equals(cf.getColumn("colAA".getBytes()).value(), hexToBytes("76616c4141"));
@@ -63,7 +63,7 @@ public class SSTableImportTest extends S
         SSTableImport.importJson(jsonUrl, "Keyspace1", "Super4", tempSS.getPath());
         
         // Verify results
-        SSTableReader reader = SSTableReader.open(tempSS.getPath(), DatabaseDescriptor.getPartitioner());
+        SSTableReader reader = SSTableReader.open(Descriptor.fromFilename(tempSS.getPath()));
         QueryFilter qf = QueryFilter.getNamesFilter(Util.dk("rowA"), new QueryPath("Super4", null, null), "superA".getBytes());
         ColumnFamily cf = qf.getSSTableColumnIterator(reader).getColumnFamily();
         IColumn superCol = cf.getColumn("superA".getBytes());