You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/13 18:24:31 UTC

svn commit: r1146117 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/streaming/

Author: jbellis
Date: Wed Jul 13 16:24:31 2011
New Revision: 1146117

URL: http://svn.apache.org/viewvc?rev=1146117&view=rev
Log:
single-pass streaming
patch by Yuki Morishita; reviewed by jbellis for CASSANDRA-2677

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jul 13 16:24:31 2011
@@ -12,6 +12,7 @@
  * don't bother persisting columns shadowed by a row tombstone (CASSANDRA-2589)
  * reset CF and SC deletion times after gc_grace (CASSANDRA-2317)
  * optimize away seek when compacting wide rows (CASSANDRA-2879)
+ * single-pass streaming (CASSANDRA-2677)
 
 
 0.8.2

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Wed Jul 13 16:24:31 2011
@@ -131,6 +131,12 @@ public class ColumnFamilySerializer impl
     public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean intern, boolean fromRemote) throws IOException
     {
         int size = dis.readInt();
+        deserializeColumns(dis, cf, size, intern, fromRemote);
+    }
+
+    /* column count is already read from DataInput */
+    public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, boolean intern, boolean fromRemote) throws IOException
+    {
         ColumnFamilyStore interner = intern ? Table.open(CFMetaData.getCF(cf.id()).left).getColumnFamilyStore(cf.id()) : null;
         for (int i = 0; i < size; ++i)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Wed Jul 13 16:24:31 2011
@@ -42,27 +42,45 @@ public class IndexHelper
      * @param in the data input from which the bloom filter should be skipped
      * @throws IOException
      */
-    public static void skipBloomFilter(FileDataInput in) throws IOException
+    public static void skipBloomFilter(DataInput in) throws IOException
     {
         /* size of the bloom filter */
         int size = in.readInt();
         /* skip the serialized bloom filter */
-        FileUtils.skipBytesFully(in, size);
+        if (in instanceof FileDataInput)
+        {
+            FileUtils.skipBytesFully(in, size);
+        }
+        else
+        {
+            // skip bytes
+            byte[] skip = new byte[size];
+            in.readFully(skip);
+        }
     }
 
-	/**
-	 * Skip the index
-	 * @param file the data input from which the index should be skipped
-	 * @throws IOException if an I/O error occurs.
-	 */
-	public static void skipIndex(FileDataInput file) throws IOException
-	{
+    /**
+     * Skip the index
+     * @param in the data input from which the index should be skipped
+     * @throws IOException if an I/O error occurs.
+     */
+    public static void skipIndex(DataInput in) throws IOException
+    {
         /* read only the column index list */
-        int columnIndexSize = file.readInt();
+        int columnIndexSize = in.readInt();
         /* skip the column index data */
-        FileUtils.skipBytesFully(file, columnIndexSize);
-	}
-    
+        if (in instanceof FileDataInput)
+        {
+            FileUtils.skipBytesFully(in, columnIndexSize);
+        }
+        else
+        {
+            // skip bytes
+            byte[] skip = new byte[columnIndexSize];
+            in.readFully(skip);
+        }
+    }
+
     /**
      * Deserialize the index into a structure and return it
      *

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Wed Jul 13 16:24:31 2011
@@ -21,11 +21,11 @@ package org.apache.cassandra.io.sstable;
  */
 
 
+import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.IOError;
 import java.io.IOException;
-import java.util.ArrayList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,7 +37,7 @@ import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.utils.Filter;
+import org.apache.cassandra.utils.BytesReadTracker;
 
 public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
 {
@@ -45,14 +45,16 @@ public class SSTableIdentityIterator imp
 
     private final DecoratedKey key;
     private final long finishedAt;
-    private final BufferedRandomAccessFile file;
+    private final DataInput input;
     private final long dataStart;
     public final long dataSize;
     public final boolean fromRemote;
 
     private final ColumnFamily columnFamily;
     public final int columnCount;
-    private final long columnPosition;
+    private long columnPosition;
+
+    private BytesReadTracker inputWithTracker; // tracks bytes read
 
     // Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
     private final int expireBefore;
@@ -90,17 +92,18 @@ public class SSTableIdentityIterator imp
         this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, false);
     }
 
-    public SSTableIdentityIterator(CFMetaData metadata, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean fromRemote)
+    public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey key, long dataStart, long dataSize, boolean fromRemote)
     throws IOException
     {
         this(metadata, file, key, dataStart, dataSize, false, null, fromRemote);
     }
 
     // sstable may be null *if* deserializeRowHeader is false
-    private SSTableIdentityIterator(CFMetaData metadata, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
+    private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
     throws IOException
     {
-        this.file = file;
+        this.input = input;
+        this.inputWithTracker = new BytesReadTracker(input);
         this.key = key;
         this.dataStart = dataStart;
         this.dataSize = dataSize;
@@ -111,38 +114,47 @@ public class SSTableIdentityIterator imp
 
         try
         {
-            file.seek(this.dataStart);
-            if (checkData)
+            if (input instanceof BufferedRandomAccessFile)
             {
-                try
-                {
-                    IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.usesOldBloomFilter);
-                }
-                catch (Exception e)
-                {
-                    if (e instanceof EOFException)
-                        throw (EOFException) e;
-
-                    logger.debug("Invalid bloom filter in {}; will rebuild it", sstable);
-                    // deFreeze should have left the file position ready to deserialize index
-                }
-                try
-                {
-                    IndexHelper.deserializeIndex(file);
-                }
-                catch (Exception e)
+                BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+                file.seek(this.dataStart);
+                if (checkData)
                 {
-                    logger.debug("Invalid row summary in {}; will rebuild it", sstable);
+                    try
+                    {
+                        IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.usesOldBloomFilter);
+                    }
+                    catch (Exception e)
+                    {
+                        if (e instanceof EOFException)
+                            throw (EOFException) e;
+
+                        logger.debug("Invalid bloom filter in {}; will rebuild it", sstable);
+                        // deFreeze should have left the file position ready to deserialize index
+                    }
+                    try
+                    {
+                        IndexHelper.deserializeIndex(file);
+                    }
+                    catch (Exception e)
+                    {
+                        logger.debug("Invalid row summary in {}; will rebuild it", sstable);
+                    }
+                    file.seek(this.dataStart);
                 }
-                file.seek(this.dataStart);
             }
 
-            IndexHelper.skipBloomFilter(file);
-            IndexHelper.skipIndex(file);
+            IndexHelper.skipBloomFilter(inputWithTracker);
+            IndexHelper.skipIndex(inputWithTracker);
             columnFamily = ColumnFamily.create(metadata);
-            ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, file);
-            columnCount = file.readInt();
-            columnPosition = file.getFilePointer();
+            ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, inputWithTracker);
+            columnCount = inputWithTracker.readInt();
+
+            if (input instanceof BufferedRandomAccessFile)
+            {
+                BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+                columnPosition = file.getFilePointer();
+            }
         }
         catch (IOException e)
         {
@@ -162,14 +174,22 @@ public class SSTableIdentityIterator imp
 
     public boolean hasNext()
     {
-        return file.getFilePointer() < finishedAt;
+        if (input instanceof BufferedRandomAccessFile)
+        {
+            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            return file.getFilePointer() < finishedAt;
+        }
+        else
+        {
+            return inputWithTracker.getBytesRead() < dataSize;
+        }
     }
 
     public IColumn next()
     {
         try
         {
-            IColumn column = columnFamily.getColumnSerializer().deserialize(file, null, fromRemote, expireBefore);
+            IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, null, fromRemote, expireBefore);
             if (validateColumns)
                 column.validateFields(columnFamily.metadata());
             return column;
@@ -196,23 +216,50 @@ public class SSTableIdentityIterator imp
 
     public String getPath()
     {
-        return file.getPath();
+        // if input is from file, then return that path, otherwise it's from streaming
+        if (input instanceof BufferedRandomAccessFile)
+        {
+            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            return file.getPath();
+        }
+        else
+        {
+            throw new UnsupportedOperationException();
+        }
     }
 
     public void echoData(DataOutput out) throws IOException
     {
-        file.seek(dataStart);
-        while (file.getFilePointer() < finishedAt)
+        // only effective when input is from file
+        if (input instanceof BufferedRandomAccessFile)
         {
-            out.write(file.readByte());
+            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            file.seek(dataStart);
+            while (file.getFilePointer() < finishedAt)
+            {
+                out.write(file.readByte());
+            }
+        }
+        else
+        {
+            throw new UnsupportedOperationException();
         }
     }
 
     public ColumnFamily getColumnFamilyWithColumns() throws IOException
     {
-        file.seek(columnPosition - 4); // seek to before column count int
         ColumnFamily cf = columnFamily.cloneMeShallow();
-        ColumnFamily.serializer().deserializeColumns(file, cf, false, fromRemote);
+        if (input instanceof BufferedRandomAccessFile)
+        {
+            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            file.seek(columnPosition - 4); // seek to before column count int
+            ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, false, fromRemote);
+        }
+        else
+        {
+            // since we already read column count, just pass that value and continue deserialization
+            ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, false, fromRemote);
+        }
         if (validateColumns)
         {
             try
@@ -234,13 +281,23 @@ public class SSTableIdentityIterator imp
 
     public void reset()
     {
-        try
+        // only effective when input is from file
+        if (input instanceof BufferedRandomAccessFile)
         {
-            file.seek(columnPosition);
+            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+            try
+            {
+                file.seek(columnPosition);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            inputWithTracker.reset();
         }
-        catch (IOException e)
+        else
         {
-            throw new IOError(e);
+            throw new UnsupportedOperationException();
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Wed Jul 13 16:24:31 2011
@@ -382,6 +382,19 @@ public class SSTableReader extends SSTab
     {
         return indexSummary.getIndexPositions().size() * DatabaseDescriptor.getIndexInterval();
     }
+    
+    /**
+     * @param ranges
+     * @return An estimate of the number of keys for given ranges in this SSTable.
+     */
+    public long estimatedKeysForRanges(Collection<Range> ranges)
+    {
+        long sampleKeyCount = 0;
+        List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary.getIndexPositions(), ranges);
+        for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
+            sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
+        return sampleKeyCount * DatabaseDescriptor.getIndexInterval();
+    }
 
     /**
      * @return Approximately 1/INDEX_INTERVALth of the keys in this SSTable.

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Wed Jul 13 16:24:31 2011
@@ -277,6 +277,8 @@ public class SSTableWriter extends SSTab
     /**
      * Removes the given SSTable from temporary status and opens it, rebuilding the
      * bloom filter and row index from the data file.
+     *
+     * TODO remove this post-1.0, we have one-pass streaming now (see IncomingStreamReader)
      */
     public static class Builder implements CompactionInfo.Holder
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed Jul 13 16:24:31 2011
@@ -18,18 +18,32 @@
 
 package org.apache.cassandra.streaming;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
+import java.io.*;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
+import java.util.Collections;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
 
 public class IncomingStreamReader
@@ -70,51 +84,163 @@ public class IncomingStreamReader
             logger.debug("Receiving stream");
             logger.debug("Creating file for {}", localFile.getFilename());
         }
-        FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
-        FileChannel fc = fos.getChannel();
 
-        long offset = 0;
-        try
+        SSTableReader reader = null;
+        if (remoteFile.estimatedKeys > 0)
         {
-            for (Pair<Long, Long> section : localFile.sections)
+            logger.debug("Estimated keys {}", remoteFile.estimatedKeys);
+            DataInputStream dis = new DataInputStream(socketChannel.socket().getInputStream());
+            try
             {
-                long length = section.right - section.left;
-                long bytesRead = 0;
-                while (bytesRead < length)
-                {
-                    bytesRead = readnwrite(length, bytesRead, offset, fc);
-                }
-                offset += length;
+                reader = streamIn(dis, localFile, remoteFile);
+            }
+            catch (IOException ex)
+            {
+                retry();
+                throw ex;
+            }
+            finally
+            {
+                dis.close();
             }
         }
-        catch (IOException ex)
+        else
         {
-            /* Ask the source node to re-stream this file. */
-            session.retry(remoteFile);
+            // backwards compatibility path
+            FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
+            FileChannel fc = fos.getChannel();
 
-            /* Delete the orphaned file. */
-            FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
-            throw ex;
-        }
-        finally
-        {
-            fc.close();
+            long offset = 0;
+            try
+            {
+                for (Pair<Long, Long> section : localFile.sections)
+                {
+                    long length = section.right - section.left;
+                    long bytesRead = 0;
+                    while (bytesRead < length)
+                    {
+                        bytesRead = readnwrite(length, bytesRead, offset, fc);
+                    }
+                    offset += length;
+                }
+            }
+            catch (IOException ex)
+            {
+                retry();
+                throw ex;
+            }
+            finally
+            {
+                fc.close();
+            }
         }
 
-        session.finished(remoteFile, localFile);
+        session.finished(remoteFile, localFile, reader);
     }
 
     protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
     {
         long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
         long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead);
-	// if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read
-	// and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and 
-	// raise an exception (that will be catch belove and 'the right thing' will be done).
-	if (lastRead == 0)
-		throw new IOException("Transfer failed for remote file " + remoteFile);
+        // if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read
+        // and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and 
+        // raise an exception (that will be catch belove and 'the right thing' will be done).
+        if (lastRead == 0)
+            throw new IOException("Transfer failed for remote file " + remoteFile);
         bytesRead += lastRead;
         remoteFile.progress += lastRead;
         return bytesRead;
     }
+
+    private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException
+    {
+        ColumnFamilyStore cfs = Table.open(localFile.desc.ksname).getColumnFamilyStore(localFile.desc.cfname);
+        DecoratedKey key;
+        SSTableWriter writer = new SSTableWriter(localFile.getFilename(), remoteFile.estimatedKeys);
+        CompactionController controller = null;
+
+        BytesReadTracker in = new BytesReadTracker(input);
+
+        for (Pair<Long, Long> section : localFile.sections)
+        {
+            long length = section.right - section.left;
+            long bytesRead = 0;
+            while (bytesRead < length)
+            {
+                in.reset();
+                key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in));
+                long dataSize = SSTableReader.readRowSize(in, localFile.desc);
+                ColumnFamily cf = null;
+                if (cfs.metadata.getDefaultValidator().isCommutative())
+                {
+                    // take care of counter column family
+                    if (controller == null)
+                        controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
+                    SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
+                    AbstractCompactedRow row = controller.getCompactedRow(iter);
+                    writer.append(row);
+
+                    if (row instanceof PrecompactedRow)
+                    {
+                        // we do not purge so we should not get a null here
+                        cf = ((PrecompactedRow)row).getFullColumnFamily();
+                    }
+                }
+                else
+                {
+                    // skip BloomFilter
+                    IndexHelper.skipBloomFilter(in);
+                    // skip Index
+                    IndexHelper.skipIndex(in);
+
+                    // restore ColumnFamily
+                    cf = ColumnFamily.create(cfs.metadata);
+                    ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in);
+                    ColumnFamily.serializer().deserializeColumns(in, cf, true, true);
+
+                    // write key and cf
+                    writer.append(key, cf);
+                }
+
+                // update cache
+                ColumnFamily cached = cfs.getRawCachedRow(key);
+                if (cached != null)
+                {
+                    switch (remoteFile.type)
+                    {
+                        case AES:
+                            if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
+                            {
+                                // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
+                                // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
+                                logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
+                                cfs.invalidateCachedRow(key);
+                            }
+                            else
+                            {
+                                assert cf != null;
+                                cfs.updateRowCache(key, cf);
+                            }
+                            break;
+                        default:
+                            cfs.invalidateCachedRow(key);
+                            break;
+                    }
+                }
+
+                bytesRead += in.getBytesRead();
+                remoteFile.progress += in.getBytesRead();
+            }
+        }
+        return writer.closeAndOpenReader();
+    }
+
+    private void retry() throws IOException
+    {
+        /* Ask the source node to re-stream this file. */
+        session.retry(remoteFile);
+
+        /* Delete the orphaned file. */
+        FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Wed Jul 13 16:24:31 2011
@@ -53,15 +53,21 @@ public class PendingFile
     public final List<Pair<Long,Long>> sections;
     public final OperationType type;
     public final long size;
+    public final long estimatedKeys;
     public long progress;
 
     public PendingFile(Descriptor desc, PendingFile pf)
     {
-        this(null, desc, pf.component, pf.sections, pf.type);
+        this(null, desc, pf.component, pf.sections, pf.type, pf.estimatedKeys);
     }
 
     public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type)
     {
+        this(sstable, desc, component, sections, type, 0);
+    }
+    
+    public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type, long estimatedKeys)
+    {
         this.sstable = sstable;
         this.desc = desc;
         this.component = component;
@@ -74,6 +80,8 @@ public class PendingFile
             tempSize += section.right - section.left;
         }
         size = tempSize;
+
+        this.estimatedKeys = estimatedKeys;
     }
 
     public String getFilename()
@@ -119,6 +127,8 @@ public class PendingFile
             }
             if (version > MessagingService.VERSION_07)
                 dos.writeUTF(sc.type.name());
+            if (version > MessagingService.VERSION_080)
+                dos.writeLong(sc.estimatedKeys);
         }
 
         public PendingFile deserialize(DataInputStream dis, int version) throws IOException
@@ -137,7 +147,10 @@ public class PendingFile
             OperationType type = OperationType.RESTORE_REPLICA_COUNT;
             if (version > MessagingService.VERSION_07)
                 type = OperationType.valueOf(dis.readUTF());
-            return new PendingFile(null, desc, component, sections, type);
+            long estimatedKeys = 0;
+            if (version > MessagingService.VERSION_080)
+                estimatedKeys = dis.readLong();
+            return new PendingFile(null, desc, component, sections, type, estimatedKeys);
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Wed Jul 13 16:24:31 2011
@@ -51,6 +51,7 @@ public class StreamInSession
     private final Runnable callback;
     private String table;
     private final Collection<Future<SSTableReader>> buildFutures = new LinkedBlockingQueue<Future<SSTableReader>>();
+    private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
     private PendingFile current;
 
     private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
@@ -102,13 +103,21 @@ public class StreamInSession
         }
     }
 
-    public void finished(PendingFile remoteFile, PendingFile localFile) throws IOException
+    public void finished(PendingFile remoteFile, PendingFile localFile, SSTableReader reader) throws IOException
     {
         if (logger.isDebugEnabled())
             logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
 
-        Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
-        buildFutures.add(future);
+        if (reader != null)
+        {
+            // SSTR was already built during streaming
+            readers.add(reader);
+        }
+        else
+        {
+            Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
+            buildFutures.add(future);
+        }
 
         files.remove(remoteFile);
         if (remoteFile.equals(current))
@@ -152,6 +161,16 @@ public class StreamInSession
                     throw new RuntimeException(e);
                 }
             }
+            
+            for (SSTableReader sstable : readers)
+            {
+                assert sstable.getTableName().equals(table);
+                ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+                cfs.addSSTable(sstable);
+                if (!cfstores.containsKey(cfs))
+                    cfstores.put(cfs, new ArrayList<SSTableReader>());
+                cfstores.get(cfs).add(sstable);
+            }
 
             // build secondary indexes
             for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Wed Jul 13 16:24:31 2011
@@ -151,7 +151,7 @@ public class StreamOut
             List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
             if (sections.isEmpty())
                 continue;
-            pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type));
+            pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type, sstable.estimatedKeysForRanges(ranges)));
         }
         logger.info("Stream context metadata {}, {} sstables.", pending, Iterables.size(sstables));
         return pending;