You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/04 03:54:13 UTC

svn commit: r1142530 - in /cassandra/branches/cassandra-0.8: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/hadoop/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/streaming/

Author: jbellis
Date: Mon Jul  4 01:54:12 2011
New Revision: 1142530

URL: http://svn.apache.org/viewvc?rev=1142530&view=rev
Log:
fix index-building status display
patch by Matt Kennedy; reviewed by jbellis for CASSANDRA-2853

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1142530&r1=1142529&r2=1142530&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Jul  4 01:54:12 2011
@@ -12,6 +12,7 @@
  * Fix race in SystemTable.getCurrentLocalNodeId (CASSANDRA-2824)
  * Correctly set default for replicate_on_write (CASSANDRA-2835)
  * improve nodetool compactionstats formatting (CASSANDRA-2844)
+ * fix index-building status display (CASSANDRA-2853)
 
 
 0.8.1

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1142530&r1=1142529&r2=1142530&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Mon Jul  4 01:54:12 2011
@@ -130,6 +130,12 @@ public class ColumnFamilySerializer impl
     public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean intern, boolean fromRemote) throws IOException
     {
         int size = dis.readInt();
+        deserializeColumns(dis, cf, size, intern, fromRemote);
+    }
+
+    /* column count is already read from DataInput */
+    public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, boolean intern, boolean fromRemote) throws IOException
+    {
         ColumnFamilyStore interner = intern ? Table.open(CFMetaData.getCF(cf.id()).left).getColumnFamilyStore(cf.id()) : null;
         for (int i = 0; i < size; ++i)
         {

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java?rev=1142530&r1=1142529&r2=1142530&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Table.java Mon Jul  4 01:54:12 2011
@@ -631,8 +631,8 @@ public class Table
             return new CompactionInfo(cfs.table.name,
                                       cfs.columnFamily,
                                       CompactionType.INDEX_BUILD,
-                                      iter.getTotalBytes(),
-                                      iter.getBytesRead());
+                                      iter.getBytesRead(),
+                                      iter.getTotalBytes());
         }
 
         public void build()

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1142530&r1=1142529&r2=1142530&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Mon Jul  4 01:54:12 2011
@@ -35,10 +35,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.TokenRange;
-import org.apache.cassandra.thrift.TBinaryProtocol;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.thrift.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.thrift.TException;
@@ -101,11 +100,43 @@ public class ColumnFamilyInputFormat ext
 
         try
         {
+            KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
+            IPartitioner partitioner = null;
+            Range jobRange = null;
+            if (jobKeyRange != null)
+            {
+                partitioner = ConfigHelper.getPartitioner(context.getConfiguration());
+                assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner";
+                jobRange = new Range(partitioner.getToken(jobKeyRange.start_key),
+                                     partitioner.getToken(jobKeyRange.end_key),
+                                     partitioner);
+            }
+
             List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
             for (TokenRange range : masterRangeNodes)
             {
+                if (jobRange == null)
+                {
                     // for each range, pick a live owner and ask it to compute bite-sized splits
                     splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+                }
+                else
+                {
+                    Range dhtRange = new Range(partitioner.getTokenFactory().fromString(range.start_token),
+                                               partitioner.getTokenFactory().fromString(range.end_token),
+                                               partitioner);
+
+                    if (dhtRange.intersects(jobRange))
+                    {
+                        Set<Range> intersections = dhtRange.intersectionWith(jobRange);
+                        assert intersections.size() == 1 : "wrapping ranges not supported";
+                        Range intersection = intersections.iterator().next();
+                        range.start_token = partitioner.getTokenFactory().toString(intersection.left);
+                        range.end_token = partitioner.getTokenFactory().toString(intersection.right);
+                        // for each range, pick a live owner and ask it to compute bite-sized splits
+                        splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+                    }
+                }
             }
 
             // wait until we have all the results back

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1142530&r1=1142529&r2=1142530&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Mon Jul  4 01:54:12 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.hadoop;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.thrift.KeyRange;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.TBinaryProtocol;
 import org.apache.cassandra.utils.FBUtilities;
@@ -42,6 +43,7 @@ public class ConfigHelper
     private static final String INPUT_COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily";
     private static final String OUTPUT_COLUMNFAMILY_CONFIG = "cassandra.output.columnfamily";
     private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate";
+    private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange";
     private static final String OUTPUT_PREDICATE_CONFIG = "cassandra.output.predicate";
     private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
     private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
@@ -195,6 +197,51 @@ public class ConfigHelper
         return predicate;
     }
 
+    /**
+     * Set the KeyRange to limit the rows.
+     * @param conf Job configuration you are about to run
+     * @param keyRange
+     */
+    public static void setInputKeyRange(Configuration conf, KeyRange keyRange){
+        conf.set(INPUT_KEYRANGE_CONFIG, keyRangeToString(keyRange));
+    }
+
+    /** may be null if unset */
+    public static KeyRange getInputKeyRange(Configuration conf){
+        String str = conf.get(INPUT_KEYRANGE_CONFIG);
+        return null != str ? keyRangeFromString(str) : null;
+    }
+
+    private static String keyRangeToString(KeyRange keyRange)
+    {
+        assert keyRange != null;
+        TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
+        try
+        {
+            return FBUtilities.bytesToHex(serializer.serialize(keyRange));
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static KeyRange keyRangeFromString(String st)
+    {
+        assert st != null;
+        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+        KeyRange keyRange = new KeyRange();
+        try
+        {
+            deserializer.deserialize(keyRange, FBUtilities.hexToBytes(st));
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return keyRange;
+    }
+
     public static String getInputKeyspace(Configuration conf)
     {
         return conf.get(INPUT_KEYSPACE_CONFIG);

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=1142530&r1=1142529&r2=1142530&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Mon Jul  4 01:54:12 2011
@@ -42,27 +42,45 @@ public class IndexHelper
      * @param in the data input from which the bloom filter should be skipped
      * @throws IOException
      */
-    public static void skipBloomFilter(FileDataInput in) throws IOException
+    public static void skipBloomFilter(DataInput in) throws IOException
     {
         /* size of the bloom filter */
         int size = in.readInt();
         /* skip the serialized bloom filter */
-        FileUtils.skipBytesFully(in, size);
+        if (in instanceof FileDataInput)
+        {
+            FileUtils.skipBytesFully(in, size);
+        }
+        else
+        {
+            // skip bytes
+            byte[] skip = new byte[size];
+            in.readFully(skip);
+        }
     }
 
-	/**
-	 * Skip the index
-	 * @param file the data input from which the index should be skipped
-	 * @throws IOException if an I/O error occurs.
-	 */
-	public static void skipIndex(FileDataInput file) throws IOException
-	{
+    /**
+     * Skip the index
+     * @param in the data input from which the index should be skipped
+     * @throws IOException if an I/O error occurs.
+     */
+    public static void skipIndex(DataInput in) throws IOException
+    {
         /* read only the column index list */
-        int columnIndexSize = file.readInt();
+        int columnIndexSize = in.readInt();
         /* skip the column index data */
-        FileUtils.skipBytesFully(file, columnIndexSize);
-	}
-    
+        if (in instanceof FileDataInput)
+        {
+            FileUtils.skipBytesFully(in, columnIndexSize);
+        }
+        else
+        {
+            // skip bytes
+            byte[] skip = new byte[columnIndexSize];
+            in.readFully(skip);
+        }
+    }
+
     /**
      * Deserialize the index into a structure and return it
      *

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1142530&r1=1142529&r2=1142530&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Mon Jul  4 01:54:12 2011
@@ -21,11 +21,11 @@ package org.apache.cassandra.io.sstable;
  */
 
 
+import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.IOError;
 import java.io.IOException;
-import java.util.ArrayList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,7 +37,7 @@ import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.utils.Filter;
+import org.apache.cassandra.utils.BytesReadTracker;
 
 public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
 {
@@ -45,14 +45,16 @@ public class SSTableIdentityIterator imp
 
     private final DecoratedKey key;
     private final long finishedAt;
-    private final BufferedRandomAccessFile file;
+    private final DataInput input;
     private final long dataStart;
     public final long dataSize;
     public final boolean fromRemote;
 
     private final ColumnFamily columnFamily;
     public final int columnCount;
-    private final long columnPosition;
+    private long columnPosition;
+
+    private BytesReadTracker inputWithTracker; // tracks bytes read
 
     // Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
     private final int expireBefore;
@@ -90,17 +92,18 @@ public class SSTableIdentityIterator imp
         this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, false);
     }
 
-    public SSTableIdentityIterator(CFMetaData metadata, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean fromRemote)
+    public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey key, long dataStart, long dataSize, boolean fromRemote)
     throws IOException
     {
         this(metadata, file, key, dataStart, dataSize, false, null, fromRemote);
     }
 
     // sstable may be null *if* deserializeRowHeader is false
-    private SSTableIdentityIterator(CFMetaData metadata, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
+    private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
     throws IOException
     {
-        this.file = file;
+        this.input = input;
+        this.inputWithTracker = new BytesReadTracker(input);
         this.key = key;
         this.dataStart = dataStart;
         this.dataSize = dataSize;
@@ -111,38 +114,47 @@ public class SSTableIdentityIterator imp
 
         try
         {
-            file.seek(this.dataStart);
-            if (checkData)
+            if (input instanceof BufferedRandomAccessFile)
             {
-                try
-                {
-                    IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.usesOldBloomFilter);
-                }
-                catch (Exception e)
-                {
-                    if (e instanceof EOFException)
-                        throw (EOFException) e;
-
-                    logger.debug("Invalid bloom filter in {}; will rebuild it", sstable);
-                    // deFreeze should have left the file position ready to deserialize index
-                }
-                try
-                {
-                    IndexHelper.deserializeIndex(file);
-                }
-                catch (Exception e)
+                BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+                file.seek(this.dataStart);
+                if (checkData)
                 {
-                    logger.debug("Invalid row summary in {}; will rebuild it", sstable);
+                    try
+                    {
+                        IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.usesOldBloomFilter);
+                    }
+                    catch (Exception e)
+                    {
+                        if (e instanceof EOFException)
+                            throw (EOFException) e;
+
+                        logger.debug("Invalid bloom filter in {}; will rebuild it", sstable);
+                        // deFreeze should have left the file position ready to deserialize index
+                    }
+                    try
+                    {
+                        IndexHelper.deserializeIndex(file);
+                    }
+                    catch (Exception e)
+                    {
+                        logger.debug("Invalid row summary in {}; will rebuild it", sstable);
+                    }
+                    file.seek(this.dataStart);
                 }
-                file.seek(this.dataStart);
             }
 
-            IndexHelper.skipBloomFilter(file);
-            IndexHelper.skipIndex(file);
+            IndexHelper.skipBloomFilter(inputWithTracker);
+            IndexHelper.skipIndex(inputWithTracker);
             columnFamily = ColumnFamily.create(metadata);
-            ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, file);
-            columnCount = file.readInt();
-            columnPosition = file.getFilePointer();
+            ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, inputWithTracker);
+            columnCount = inputWithTracker.readInt();
+
+            if (input instanceof BufferedRandomAccessFile)
+            {
+                BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+                columnPosition = file.getFilePointer();
+            }
         }
         catch (IOException e)
         {
@@ -162,14 +174,22 @@ public class SSTableIdentityIterator imp
 
     public boolean hasNext()
     {
-        return file.getFilePointer() < finishedAt;
+        if (input instanceof BufferedRandomAccessFile)
+        {
+            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            return file.getFilePointer() < finishedAt;
+        }
+        else
+        {
+            return inputWithTracker.getBytesRead() < dataSize;
+        }
     }
 
     public IColumn next()
     {
         try
         {
-            IColumn column = columnFamily.getColumnSerializer().deserialize(file, null, fromRemote, expireBefore);
+            IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, null, fromRemote, expireBefore);
             if (validateColumns)
                 column.validateFields(columnFamily.metadata());
             return column;
@@ -196,23 +216,50 @@ public class SSTableIdentityIterator imp
 
     public String getPath()
     {
-        return file.getPath();
+        // if input is from file, then return that path, otherwise it's from streaming
+        if (input instanceof BufferedRandomAccessFile)
+        {
+            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            return file.getPath();
+        }
+        else
+        {
+            throw new UnsupportedOperationException();
+        }
     }
 
     public void echoData(DataOutput out) throws IOException
     {
-        file.seek(dataStart);
-        while (file.getFilePointer() < finishedAt)
+        // only effective when input is from file
+        if (input instanceof BufferedRandomAccessFile)
         {
-            out.write(file.readByte());
+            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            file.seek(dataStart);
+            while (file.getFilePointer() < finishedAt)
+            {
+                out.write(file.readByte());
+            }
+        }
+        else
+        {
+            throw new UnsupportedOperationException();
         }
     }
 
     public ColumnFamily getColumnFamilyWithColumns() throws IOException
     {
-        file.seek(columnPosition - 4); // seek to before column count int
         ColumnFamily cf = columnFamily.cloneMeShallow();
-        ColumnFamily.serializer().deserializeColumns(file, cf, false, fromRemote);
+        if (input instanceof BufferedRandomAccessFile)
+        {
+            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            file.seek(columnPosition - 4); // seek to before column count int
+            ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, false, fromRemote);
+        }
+        else
+        {
+            // since we already read column count, just pass that value and continue deserialization
+            ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, false, fromRemote);
+        }
         if (validateColumns)
         {
             try
@@ -234,13 +281,23 @@ public class SSTableIdentityIterator imp
 
     public void reset()
     {
-        try
+        // only effective when input is from file
+        if (input instanceof BufferedRandomAccessFile)
         {
-            file.seek(columnPosition);
+            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            try
+            {
+                file.seek(columnPosition);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            inputWithTracker.reset();
         }
-        catch (IOException e)
+        else
         {
-            throw new IOError(e);
+            throw new UnsupportedOperationException();
         }
     }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1142530&r1=1142529&r2=1142530&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Mon Jul  4 01:54:12 2011
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.streaming;
 
+import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -25,11 +27,27 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
+import java.util.Collections;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
 
 public class IncomingStreamReader
@@ -70,51 +88,162 @@ public class IncomingStreamReader
             logger.debug("Receiving stream");
             logger.debug("Creating file for {}", localFile.getFilename());
         }
-        FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
-        FileChannel fc = fos.getChannel();
 
-        long offset = 0;
-        try
+        SSTableReader reader = null;
+        if (remoteFile.estimatedKeys > 0)
         {
-            for (Pair<Long, Long> section : localFile.sections)
+            logger.debug("Estimated keys {}", remoteFile.estimatedKeys);
+            DataInputStream dis = new DataInputStream(socketChannel.socket().getInputStream());
+            try
             {
-                long length = section.right - section.left;
-                long bytesRead = 0;
-                while (bytesRead < length)
-                {
-                    bytesRead = readnwrite(length, bytesRead, offset, fc);
-                }
-                offset += length;
+                reader = streamIn(dis, localFile, remoteFile);
+            }
+            catch (IOException ex)
+            {
+                retry();
+                throw ex;
+            }
+            finally
+            {
+                dis.close();
             }
         }
-        catch (IOException ex)
+        else
         {
-            /* Ask the source node to re-stream this file. */
-            session.retry(remoteFile);
+            FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
+            FileChannel fc = fos.getChannel();
 
-            /* Delete the orphaned file. */
-            FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
-            throw ex;
-        }
-        finally
-        {
-            fc.close();
+            long offset = 0;
+            try
+            {
+                for (Pair<Long, Long> section : localFile.sections)
+                {
+                    long length = section.right - section.left;
+                    long bytesRead = 0;
+                    while (bytesRead < length)
+                    {
+                        bytesRead = readnwrite(length, bytesRead, offset, fc);
+                    }
+                    offset += length;
+                }
+            }
+            catch (IOException ex)
+            {
+                retry();
+                throw ex;
+            }
+            finally
+            {
+                fc.close();
+            }
         }
 
-        session.finished(remoteFile, localFile);
+        session.finished(remoteFile, localFile, reader);
     }
 
     protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
     {
         long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
         long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead);
-	// if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read
-	// and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and 
-	// raise an exception (that will be catch belove and 'the right thing' will be done).
-	if (lastRead == 0)
-		throw new IOException("Transfer failed for remote file " + remoteFile);
+        // if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read
+        // and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and 
+        // raise an exception (that will be catch belove and 'the right thing' will be done).
+        if (lastRead == 0)
+            throw new IOException("Transfer failed for remote file " + remoteFile);
         bytesRead += lastRead;
         remoteFile.progress += lastRead;
         return bytesRead;
     }
+
+    private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException
+    {
+        ColumnFamilyStore cfs = Table.open(localFile.desc.ksname).getColumnFamilyStore(localFile.desc.cfname);
+        DecoratedKey key;
+        SSTableWriter writer = new SSTableWriter(localFile.getFilename(), remoteFile.estimatedKeys);
+        CompactionController controller = null;
+
+        BytesReadTracker in = new BytesReadTracker(input);
+
+        for (Pair<Long, Long> section : localFile.sections)
+        {
+            long length = section.right - section.left;
+            long bytesRead = 0;
+            while (bytesRead < length)
+            {
+                key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in));
+                long dataSize = SSTableReader.readRowSize(in, localFile.desc);
+                ColumnFamily cf = null;
+                if (cfs.metadata.getDefaultValidator().isCommutative())
+                {
+                    // take care of counter column family
+                    if (controller == null)
+                        controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
+                    SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
+                    AbstractCompactedRow row = controller.getCompactedRow(iter);
+                    writer.append(row);
+
+                    if (row instanceof PrecompactedRow)
+                    {
+                        // we do not purge so we should not get a null here
+                        cf = ((PrecompactedRow)row).getFullColumnFamily();
+                    }
+                }
+                else
+                {
+                    // skip BloomFilter
+                    IndexHelper.skipBloomFilter(in);
+                    // skip Index
+                    IndexHelper.skipIndex(in);
+
+                    // restore ColumnFamily
+                    cf = ColumnFamily.create(cfs.metadata);
+                    ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in);
+                    ColumnFamily.serializer().deserializeColumns(in, cf, true, true);
+
+                    // write key and cf
+                    writer.append(key, cf);
+                }
+
+                // update cache
+                ColumnFamily cached = cfs.getRawCachedRow(key);
+                if (cached != null)
+                {
+                    switch (remoteFile.type)
+                    {
+                        case AES:
+                            if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
+                            {
+                                // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
+                                // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
+                                logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
+                                cfs.invalidateCachedRow(key);
+                            }
+                            else
+                            {
+                                assert cf != null;
+                                cfs.updateRowCache(key, cf);
+                            }
+                            break;
+                        default:
+                            cfs.invalidateCachedRow(key);
+                            break;
+                    }
+                }
+
+                bytesRead += in.getBytesRead();
+                remoteFile.progress += in.getBytesRead();
+            }
+        }
+
+        return writer.closeAndOpenReader();
+    }
+
+    private void retry() throws IOException
+    {
+        /* Ask the source node to re-stream this file. */
+        session.retry(remoteFile);
+
+        /* Delete the orphaned file. */
+        FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
+    }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1142530&r1=1142529&r2=1142530&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java Mon Jul  4 01:54:12 2011
@@ -53,15 +53,21 @@ public class PendingFile
     public final List<Pair<Long,Long>> sections;
     public final OperationType type;
     public final long size;
+    public final long estimatedKeys;
     public long progress;
 
     public PendingFile(Descriptor desc, PendingFile pf)
     {
-        this(null, desc, pf.component, pf.sections, pf.type);
+        this(null, desc, pf.component, pf.sections, pf.type, pf.estimatedKeys);
     }
 
     public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type)
     {
+        this(sstable, desc, component, sections, type, 0);
+    }
+    
+    public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type, long estimatedKeys)
+    {
         this.sstable = sstable;
         this.desc = desc;
         this.component = component;
@@ -74,6 +80,8 @@ public class PendingFile
             tempSize += section.right - section.left;
         }
         size = tempSize;
+
+        this.estimatedKeys = estimatedKeys;
     }
 
     public String getFilename()
@@ -119,6 +127,7 @@ public class PendingFile
             }
             if (version > MessagingService.VERSION_07)
                 dos.writeUTF(sc.type.name());
+            dos.writeLong(sc.estimatedKeys);
         }
 
         public PendingFile deserialize(DataInputStream dis, int version) throws IOException
@@ -137,7 +146,8 @@ public class PendingFile
             OperationType type = OperationType.RESTORE_REPLICA_COUNT;
             if (version > MessagingService.VERSION_07)
                 type = OperationType.valueOf(dis.readUTF());
-            return new PendingFile(null, desc, component, sections, type);
+            long estimatedKeys = dis.readLong();
+            return new PendingFile(null, desc, component, sections, type, estimatedKeys);
         }
     }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1142530&r1=1142529&r2=1142530&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java Mon Jul  4 01:54:12 2011
@@ -51,6 +51,7 @@ public class StreamInSession
     private final Runnable callback;
     private String table;
     private final Collection<Future<SSTableReader>> buildFutures = new LinkedBlockingQueue<Future<SSTableReader>>();
+    private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
     private PendingFile current;
 
     private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
@@ -102,13 +103,21 @@ public class StreamInSession
         }
     }
 
-    public void finished(PendingFile remoteFile, PendingFile localFile) throws IOException
+    public void finished(PendingFile remoteFile, PendingFile localFile, SSTableReader reader) throws IOException
     {
         if (logger.isDebugEnabled())
             logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
 
-        Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
-        buildFutures.add(future);
+        if (reader != null)
+        {
+            // SSTR was already built during streaming
+            readers.add(reader);
+        }
+        else
+        {
+            Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
+            buildFutures.add(future);
+        }
 
         files.remove(remoteFile);
         if (remoteFile.equals(current))
@@ -136,14 +145,7 @@ public class StreamInSession
                 try
                 {
                     SSTableReader sstable = future.get();
-                    assert sstable.getTableName().equals(table);
-                    if (sstable == null)
-                        continue;
-                    ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
-                    cfs.addSSTable(sstable);
-                    if (!cfstores.containsKey(cfs))
-                        cfstores.put(cfs, new ArrayList<SSTableReader>());
-                    cfstores.get(cfs).add(sstable);
+                    readers.add(sstable);
                 }
                 catch (InterruptedException e)
                 {
@@ -154,6 +156,18 @@ public class StreamInSession
                     throw new RuntimeException(e);
                 }
             }
+            
+            for (SSTableReader sstable : readers)
+            {
+                assert sstable.getTableName().equals(table);
+                if (sstable == null)
+                    continue;
+                ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+                cfs.addSSTable(sstable);
+                if (!cfstores.containsKey(cfs))
+                    cfstores.put(cfs, new ArrayList<SSTableReader>());
+                cfstores.get(cfs).add(sstable);
+            }
 
             // build secondary indexes
             for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1142530&r1=1142529&r2=1142530&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java Mon Jul  4 01:54:12 2011
@@ -176,7 +176,7 @@ public class StreamOut
             List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
             if (sections.isEmpty())
                 continue;
-            pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type));
+            pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type, sstable.estimatedKeys()));
         }
         logger.info("Stream context metadata {}, {} sstables.", pending, sstables.size());
         return pending;