You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/04 15:02:06 UTC

svn commit: r1142647 - in /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra: db/ hadoop/ io/sstable/ streaming/

Author: jbellis
Date: Mon Jul  4 13:02:05 2011
New Revision: 1142647

URL: http://svn.apache.org/viewvc?rev=1142647&view=rev
Log:
revert incomplete changes

Modified:
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Mon Jul  4 13:02:05 2011
@@ -130,12 +130,6 @@ public class ColumnFamilySerializer impl
     public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean intern, boolean fromRemote) throws IOException
     {
         int size = dis.readInt();
-        deserializeColumns(dis, cf, size, intern, fromRemote);
-    }
-
-    /* column count is already read from DataInput */
-    public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, boolean intern, boolean fromRemote) throws IOException
-    {
         ColumnFamilyStore interner = intern ? Table.open(CFMetaData.getCF(cf.id()).left).getColumnFamilyStore(cf.id()) : null;
         for (int i = 0; i < size; ++i)
         {

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Mon Jul  4 13:02:05 2011
@@ -35,9 +35,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.TokenRange;
+import org.apache.cassandra.thrift.TBinaryProtocol;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.thrift.TException;
@@ -100,43 +101,11 @@ public class ColumnFamilyInputFormat ext
 
         try
         {
-            KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
-            IPartitioner partitioner = null;
-            Range jobRange = null;
-            if (jobKeyRange != null)
-            {
-                partitioner = ConfigHelper.getPartitioner(context.getConfiguration());
-                assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner";
-                jobRange = new Range(partitioner.getToken(jobKeyRange.start_key),
-                                     partitioner.getToken(jobKeyRange.end_key),
-                                     partitioner);
-            }
-
             List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
             for (TokenRange range : masterRangeNodes)
             {
-                if (jobRange == null)
-                {
                     // for each range, pick a live owner and ask it to compute bite-sized splits
                     splitfutures.add(executor.submit(new SplitCallable(range, conf)));
-                }
-                else
-                {
-                    Range dhtRange = new Range(partitioner.getTokenFactory().fromString(range.start_token),
-                                               partitioner.getTokenFactory().fromString(range.end_token),
-                                               partitioner);
-
-                    if (dhtRange.intersects(jobRange))
-                    {
-                        Set<Range> intersections = dhtRange.intersectionWith(jobRange);
-                        assert intersections.size() == 1 : "wrapping ranges not supported";
-                        Range intersection = intersections.iterator().next();
-                        range.start_token = partitioner.getTokenFactory().toString(intersection.left);
-                        range.end_token = partitioner.getTokenFactory().toString(intersection.right);
-                        // for each range, pick a live owner and ask it to compute bite-sized splits
-                        splitfutures.add(executor.submit(new SplitCallable(range, conf)));
-                    }
-                }
             }
 
             // wait until we have all the results back

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Mon Jul  4 13:02:05 2011
@@ -22,7 +22,6 @@ package org.apache.cassandra.hadoop;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.thrift.KeyRange;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.TBinaryProtocol;
 import org.apache.cassandra.utils.FBUtilities;
@@ -43,7 +42,6 @@ public class ConfigHelper
     private static final String INPUT_COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily";
     private static final String OUTPUT_COLUMNFAMILY_CONFIG = "cassandra.output.columnfamily";
     private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate";
-    private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange";
     private static final String OUTPUT_PREDICATE_CONFIG = "cassandra.output.predicate";
     private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
     private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
@@ -197,51 +195,6 @@ public class ConfigHelper
         return predicate;
     }
 
-    /**
-     * Set the KeyRange to limit the rows.
-     * @param conf Job configuration you are about to run
-     * @param keyRange
-     */
-    public static void setInputKeyRange(Configuration conf, KeyRange keyRange){
-        conf.set(INPUT_KEYRANGE_CONFIG, keyRangeToString(keyRange));
-    }
-
-    /** may be null if unset */
-    public static KeyRange getInputKeyRange(Configuration conf){
-        String str = conf.get(INPUT_KEYRANGE_CONFIG);
-        return null != str ? keyRangeFromString(str) : null;
-    }
-
-    private static String keyRangeToString(KeyRange keyRange)
-    {
-        assert keyRange != null;
-        TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
-        try
-        {
-            return FBUtilities.bytesToHex(serializer.serialize(keyRange));
-        }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static KeyRange keyRangeFromString(String st)
-    {
-        assert st != null;
-        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
-        KeyRange keyRange = new KeyRange();
-        try
-        {
-            deserializer.deserialize(keyRange, FBUtilities.hexToBytes(st));
-        }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-        return keyRange;
-    }
-
     public static String getInputKeyspace(Configuration conf)
     {
         return conf.get(INPUT_KEYSPACE_CONFIG);

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Mon Jul  4 13:02:05 2011
@@ -42,45 +42,27 @@ public class IndexHelper
      * @param in the data input from which the bloom filter should be skipped
      * @throws IOException
      */
-    public static void skipBloomFilter(DataInput in) throws IOException
+    public static void skipBloomFilter(FileDataInput in) throws IOException
     {
         /* size of the bloom filter */
         int size = in.readInt();
         /* skip the serialized bloom filter */
-        if (in instanceof FileDataInput)
-        {
-            FileUtils.skipBytesFully(in, size);
-        }
-        else
-        {
-            // skip bytes
-            byte[] skip = new byte[size];
-            in.readFully(skip);
-        }
+        FileUtils.skipBytesFully(in, size);
     }
 
-    /**
-     * Skip the index
-     * @param in the data input from which the index should be skipped
-     * @throws IOException if an I/O error occurs.
-     */
-    public static void skipIndex(DataInput in) throws IOException
-    {
+	/**
+	 * Skip the index
+	 * @param file the data input from which the index should be skipped
+	 * @throws IOException if an I/O error occurs.
+	 */
+	public static void skipIndex(FileDataInput file) throws IOException
+	{
         /* read only the column index list */
-        int columnIndexSize = in.readInt();
+        int columnIndexSize = file.readInt();
         /* skip the column index data */
-        if (in instanceof FileDataInput)
-        {
-            FileUtils.skipBytesFully(in, columnIndexSize);
-        }
-        else
-        {
-            // skip bytes
-            byte[] skip = new byte[columnIndexSize];
-            in.readFully(skip);
-        }
-    }
-
+        FileUtils.skipBytesFully(file, columnIndexSize);
+	}
+    
     /**
      * Deserialize the index into a structure and return it
      *

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Mon Jul  4 13:02:05 2011
@@ -21,11 +21,11 @@ package org.apache.cassandra.io.sstable;
  */
 
 
-import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.IOError;
 import java.io.IOException;
+import java.util.ArrayList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,7 +37,7 @@ import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.utils.Filter;
 
 public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
 {
@@ -45,16 +45,14 @@ public class SSTableIdentityIterator imp
 
     private final DecoratedKey key;
     private final long finishedAt;
-    private final DataInput input;
+    private final BufferedRandomAccessFile file;
     private final long dataStart;
     public final long dataSize;
     public final boolean fromRemote;
 
     private final ColumnFamily columnFamily;
     public final int columnCount;
-    private long columnPosition;
-
-    private BytesReadTracker inputWithTracker; // tracks bytes read
+    private final long columnPosition;
 
     // Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
     private final int expireBefore;
@@ -92,18 +90,17 @@ public class SSTableIdentityIterator imp
         this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, false);
     }
 
-    public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey key, long dataStart, long dataSize, boolean fromRemote)
+    public SSTableIdentityIterator(CFMetaData metadata, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean fromRemote)
     throws IOException
     {
         this(metadata, file, key, dataStart, dataSize, false, null, fromRemote);
     }
 
     // sstable may be null *if* deserializeRowHeader is false
-    private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
+    private SSTableIdentityIterator(CFMetaData metadata, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
     throws IOException
     {
-        this.input = input;
-        this.inputWithTracker = new BytesReadTracker(input);
+        this.file = file;
         this.key = key;
         this.dataStart = dataStart;
         this.dataSize = dataSize;
@@ -114,47 +111,38 @@ public class SSTableIdentityIterator imp
 
         try
         {
-            if (input instanceof BufferedRandomAccessFile)
+            file.seek(this.dataStart);
+            if (checkData)
             {
-                BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
-                file.seek(this.dataStart);
-                if (checkData)
+                try
+                {
+                    IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.usesOldBloomFilter);
+                }
+                catch (Exception e)
+                {
+                    if (e instanceof EOFException)
+                        throw (EOFException) e;
+
+                    logger.debug("Invalid bloom filter in {}; will rebuild it", sstable);
+                    // deFreeze should have left the file position ready to deserialize index
+                }
+                try
                 {
-                    try
-                    {
-                        IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.usesOldBloomFilter);
-                    }
-                    catch (Exception e)
-                    {
-                        if (e instanceof EOFException)
-                            throw (EOFException) e;
-
-                        logger.debug("Invalid bloom filter in {}; will rebuild it", sstable);
-                        // deFreeze should have left the file position ready to deserialize index
-                    }
-                    try
-                    {
-                        IndexHelper.deserializeIndex(file);
-                    }
-                    catch (Exception e)
-                    {
-                        logger.debug("Invalid row summary in {}; will rebuild it", sstable);
-                    }
-                    file.seek(this.dataStart);
+                    IndexHelper.deserializeIndex(file);
                 }
+                catch (Exception e)
+                {
+                    logger.debug("Invalid row summary in {}; will rebuild it", sstable);
+                }
+                file.seek(this.dataStart);
             }
 
-            IndexHelper.skipBloomFilter(inputWithTracker);
-            IndexHelper.skipIndex(inputWithTracker);
+            IndexHelper.skipBloomFilter(file);
+            IndexHelper.skipIndex(file);
             columnFamily = ColumnFamily.create(metadata);
-            ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, inputWithTracker);
-            columnCount = inputWithTracker.readInt();
-
-            if (input instanceof BufferedRandomAccessFile)
-            {
-                BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
-                columnPosition = file.getFilePointer();
-            }
+            ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, file);
+            columnCount = file.readInt();
+            columnPosition = file.getFilePointer();
         }
         catch (IOException e)
         {
@@ -174,22 +162,14 @@ public class SSTableIdentityIterator imp
 
     public boolean hasNext()
     {
-        if (input instanceof BufferedRandomAccessFile)
-        {
-            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
-            return file.getFilePointer() < finishedAt;
-        }
-        else
-        {
-            return inputWithTracker.getBytesRead() < dataSize;
-        }
+        return file.getFilePointer() < finishedAt;
     }
 
     public IColumn next()
     {
         try
         {
-            IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, null, fromRemote, expireBefore);
+            IColumn column = columnFamily.getColumnSerializer().deserialize(file, null, fromRemote, expireBefore);
             if (validateColumns)
                 column.validateFields(columnFamily.metadata());
             return column;
@@ -216,50 +196,23 @@ public class SSTableIdentityIterator imp
 
     public String getPath()
     {
-        // if input is from file, then return that path, otherwise it's from streaming
-        if (input instanceof BufferedRandomAccessFile)
-        {
-            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
-            return file.getPath();
-        }
-        else
-        {
-            throw new UnsupportedOperationException();
-        }
+        return file.getPath();
     }
 
     public void echoData(DataOutput out) throws IOException
     {
-        // only effective when input is from file
-        if (input instanceof BufferedRandomAccessFile)
+        file.seek(dataStart);
+        while (file.getFilePointer() < finishedAt)
         {
-            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
-            file.seek(dataStart);
-            while (file.getFilePointer() < finishedAt)
-            {
-                out.write(file.readByte());
-            }
-        }
-        else
-        {
-            throw new UnsupportedOperationException();
+            out.write(file.readByte());
         }
     }
 
     public ColumnFamily getColumnFamilyWithColumns() throws IOException
     {
+        file.seek(columnPosition - 4); // seek to before column count int
         ColumnFamily cf = columnFamily.cloneMeShallow();
-        if (input instanceof BufferedRandomAccessFile)
-        {
-            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
-            file.seek(columnPosition - 4); // seek to before column count int
-            ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, false, fromRemote);
-        }
-        else
-        {
-            // since we already read column count, just pass that value and continue deserialization
-            ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, false, fromRemote);
-        }
+        ColumnFamily.serializer().deserializeColumns(file, cf, false, fromRemote);
         if (validateColumns)
         {
             try
@@ -281,23 +234,13 @@ public class SSTableIdentityIterator imp
 
     public void reset()
     {
-        // only effective when input is from file
-        if (input instanceof BufferedRandomAccessFile)
+        try
         {
-            BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
-            try
-            {
-                file.seek(columnPosition);
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-            inputWithTracker.reset();
+            file.seek(columnPosition);
         }
-        else
+        catch (IOException e)
         {
-            throw new UnsupportedOperationException();
+            throw new IOError(e);
         }
     }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Mon Jul  4 13:02:05 2011
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.streaming;
 
-import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -27,27 +25,11 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
-import java.util.Collections;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.PrecompactedRow;
-import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
 
 public class IncomingStreamReader
@@ -88,162 +70,51 @@ public class IncomingStreamReader
             logger.debug("Receiving stream");
             logger.debug("Creating file for {}", localFile.getFilename());
         }
+        FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
+        FileChannel fc = fos.getChannel();
 
-        SSTableReader reader = null;
-        if (remoteFile.estimatedKeys > 0)
+        long offset = 0;
+        try
         {
-            logger.debug("Estimated keys {}", remoteFile.estimatedKeys);
-            DataInputStream dis = new DataInputStream(socketChannel.socket().getInputStream());
-            try
+            for (Pair<Long, Long> section : localFile.sections)
             {
-                reader = streamIn(dis, localFile, remoteFile);
-            }
-            catch (IOException ex)
-            {
-                retry();
-                throw ex;
-            }
-            finally
-            {
-                dis.close();
+                long length = section.right - section.left;
+                long bytesRead = 0;
+                while (bytesRead < length)
+                {
+                    bytesRead = readnwrite(length, bytesRead, offset, fc);
+                }
+                offset += length;
             }
         }
-        else
+        catch (IOException ex)
         {
-            FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
-            FileChannel fc = fos.getChannel();
+            /* Ask the source node to re-stream this file. */
+            session.retry(remoteFile);
 
-            long offset = 0;
-            try
-            {
-                for (Pair<Long, Long> section : localFile.sections)
-                {
-                    long length = section.right - section.left;
-                    long bytesRead = 0;
-                    while (bytesRead < length)
-                    {
-                        bytesRead = readnwrite(length, bytesRead, offset, fc);
-                    }
-                    offset += length;
-                }
-            }
-            catch (IOException ex)
-            {
-                retry();
-                throw ex;
-            }
-            finally
-            {
-                fc.close();
-            }
+            /* Delete the orphaned file. */
+            FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
+            throw ex;
+        }
+        finally
+        {
+            fc.close();
         }
 
-        session.finished(remoteFile, localFile, reader);
+        session.finished(remoteFile, localFile);
     }
 
     protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
     {
         long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
         long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead);
-        // if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read
-        // and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and 
-        // raise an exception (that will be catch belove and 'the right thing' will be done).
-        if (lastRead == 0)
-            throw new IOException("Transfer failed for remote file " + remoteFile);
+	// if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read
+	// and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and 
+	// raise an exception (that will be catch belove and 'the right thing' will be done).
+	if (lastRead == 0)
+		throw new IOException("Transfer failed for remote file " + remoteFile);
         bytesRead += lastRead;
         remoteFile.progress += lastRead;
         return bytesRead;
     }
-
-    private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException
-    {
-        ColumnFamilyStore cfs = Table.open(localFile.desc.ksname).getColumnFamilyStore(localFile.desc.cfname);
-        DecoratedKey key;
-        SSTableWriter writer = new SSTableWriter(localFile.getFilename(), remoteFile.estimatedKeys);
-        CompactionController controller = null;
-
-        BytesReadTracker in = new BytesReadTracker(input);
-
-        for (Pair<Long, Long> section : localFile.sections)
-        {
-            long length = section.right - section.left;
-            long bytesRead = 0;
-            while (bytesRead < length)
-            {
-                key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in));
-                long dataSize = SSTableReader.readRowSize(in, localFile.desc);
-                ColumnFamily cf = null;
-                if (cfs.metadata.getDefaultValidator().isCommutative())
-                {
-                    // take care of counter column family
-                    if (controller == null)
-                        controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
-                    SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
-                    AbstractCompactedRow row = controller.getCompactedRow(iter);
-                    writer.append(row);
-
-                    if (row instanceof PrecompactedRow)
-                    {
-                        // we do not purge so we should not get a null here
-                        cf = ((PrecompactedRow)row).getFullColumnFamily();
-                    }
-                }
-                else
-                {
-                    // skip BloomFilter
-                    IndexHelper.skipBloomFilter(in);
-                    // skip Index
-                    IndexHelper.skipIndex(in);
-
-                    // restore ColumnFamily
-                    cf = ColumnFamily.create(cfs.metadata);
-                    ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in);
-                    ColumnFamily.serializer().deserializeColumns(in, cf, true, true);
-
-                    // write key and cf
-                    writer.append(key, cf);
-                }
-
-                // update cache
-                ColumnFamily cached = cfs.getRawCachedRow(key);
-                if (cached != null)
-                {
-                    switch (remoteFile.type)
-                    {
-                        case AES:
-                            if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
-                            {
-                                // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
-                                // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
-                                logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
-                                cfs.invalidateCachedRow(key);
-                            }
-                            else
-                            {
-                                assert cf != null;
-                                cfs.updateRowCache(key, cf);
-                            }
-                            break;
-                        default:
-                            cfs.invalidateCachedRow(key);
-                            break;
-                    }
-                }
-
-                bytesRead += in.getBytesRead();
-                remoteFile.progress += in.getBytesRead();
-            }
-        }
-
-        return writer.closeAndOpenReader();
-    }
-
-    private void retry() throws IOException
-    {
-        /* Ask the source node to re-stream this file. */
-        session.retry(remoteFile);
-
-        /* Delete the orphaned file. */
-        FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
-    }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java Mon Jul  4 13:02:05 2011
@@ -53,21 +53,15 @@ public class PendingFile
     public final List<Pair<Long,Long>> sections;
     public final OperationType type;
     public final long size;
-    public final long estimatedKeys;
     public long progress;
 
     public PendingFile(Descriptor desc, PendingFile pf)
     {
-        this(null, desc, pf.component, pf.sections, pf.type, pf.estimatedKeys);
+        this(null, desc, pf.component, pf.sections, pf.type);
     }
 
     public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type)
     {
-        this(sstable, desc, component, sections, type, 0);
-    }
-    
-    public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type, long estimatedKeys)
-    {
         this.sstable = sstable;
         this.desc = desc;
         this.component = component;
@@ -80,8 +74,6 @@ public class PendingFile
             tempSize += section.right - section.left;
         }
         size = tempSize;
-
-        this.estimatedKeys = estimatedKeys;
     }
 
     public String getFilename()
@@ -127,7 +119,6 @@ public class PendingFile
             }
             if (version > MessagingService.VERSION_07)
                 dos.writeUTF(sc.type.name());
-            dos.writeLong(sc.estimatedKeys);
         }
 
         public PendingFile deserialize(DataInputStream dis, int version) throws IOException
@@ -146,8 +137,7 @@ public class PendingFile
             OperationType type = OperationType.RESTORE_REPLICA_COUNT;
             if (version > MessagingService.VERSION_07)
                 type = OperationType.valueOf(dis.readUTF());
-            long estimatedKeys = dis.readLong();
-            return new PendingFile(null, desc, component, sections, type, estimatedKeys);
+            return new PendingFile(null, desc, component, sections, type);
         }
     }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java Mon Jul  4 13:02:05 2011
@@ -51,7 +51,6 @@ public class StreamInSession
     private final Runnable callback;
     private String table;
     private final Collection<Future<SSTableReader>> buildFutures = new LinkedBlockingQueue<Future<SSTableReader>>();
-    private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
     private PendingFile current;
 
     private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
@@ -103,21 +102,13 @@ public class StreamInSession
         }
     }
 
-    public void finished(PendingFile remoteFile, PendingFile localFile, SSTableReader reader) throws IOException
+    public void finished(PendingFile remoteFile, PendingFile localFile) throws IOException
     {
         if (logger.isDebugEnabled())
             logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
 
-        if (reader != null)
-        {
-            // SSTR was already built during streaming
-            readers.add(reader);
-        }
-        else
-        {
-            Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
-            buildFutures.add(future);
-        }
+        Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
+        buildFutures.add(future);
 
         files.remove(remoteFile);
         if (remoteFile.equals(current))
@@ -145,7 +136,14 @@ public class StreamInSession
                 try
                 {
                     SSTableReader sstable = future.get();
-                    readers.add(sstable);
+                    assert sstable.getTableName().equals(table);
+                    if (sstable == null)
+                        continue;
+                    ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+                    cfs.addSSTable(sstable);
+                    if (!cfstores.containsKey(cfs))
+                        cfstores.put(cfs, new ArrayList<SSTableReader>());
+                    cfstores.get(cfs).add(sstable);
                 }
                 catch (InterruptedException e)
                 {
@@ -156,18 +154,6 @@ public class StreamInSession
                     throw new RuntimeException(e);
                 }
             }
-            
-            for (SSTableReader sstable : readers)
-            {
-                assert sstable.getTableName().equals(table);
-                if (sstable == null)
-                    continue;
-                ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
-                cfs.addSSTable(sstable);
-                if (!cfstores.containsKey(cfs))
-                    cfstores.put(cfs, new ArrayList<SSTableReader>());
-                cfstores.get(cfs).add(sstable);
-            }
 
             // build secondary indexes
             for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java Mon Jul  4 13:02:05 2011
@@ -176,7 +176,7 @@ public class StreamOut
             List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
             if (sections.isEmpty())
                 continue;
-            pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type, sstable.estimatedKeys()));
+            pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type));
         }
         logger.info("Stream context metadata {}, {} sstables.", pending, sstables.size());
         return pending;