You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/26 06:30:44 UTC

svn commit: r1150984 - in /cassandra/trunk: src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/security/streaming/ src/java/org/apache/cassandra/stream...

Author: jbellis
Date: Tue Jul 26 04:30:43 2011
New Revision: 1150984

URL: http://svn.apache.org/viewvc?rev=1150984&view=rev
Log:
Remove SSTableWriter.Builder
patch by jbellis and stuhood; reviewed by Yuki Morishita for CASSANDRA-2920

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Tue Jul 26 04:30:43 2011
@@ -857,39 +857,6 @@ public class CompactionManager implement
             return executor.submit(runnable);
     }
 
-    /**
-     * Submits an sstable to be rebuilt: is not scheduled, since the sstable must not exist.
-     */
-    public Future<SSTableReader> submitSSTableBuild(final Descriptor desc, OperationType type)
-    {
-        // invalid descriptions due to missing or dropped CFS are handled by SSTW and StreamInSession.
-        final SSTableWriter.Builder builder = SSTableWriter.createBuilder(desc, type);
-        Callable<SSTableReader> callable = new Callable<SSTableReader>()
-        {
-            public SSTableReader call() throws IOException
-            {
-                compactionLock.readLock().lock();
-                try
-                {
-                    executor.beginCompaction(builder);
-                    try
-                    {
-                        return builder.build();
-                    }
-                    finally
-                    {
-                        executor.finishCompaction(builder);
-                    }
-                }
-                finally
-                {
-                    compactionLock.readLock().unlock();
-                }
-            }
-        };
-        return executor.submit(callable);
-    }
-
     public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer)
     {
         Runnable runnable = new WrappedRunnable()

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Jul 26 04:30:43 2011
@@ -170,6 +170,11 @@ public class SSTableWriter extends SSTab
         afterAppend(decoratedKey, currentPosition);
     }
 
+    public void updateMaxTimestamp(long timestamp)
+    {
+        sstableMetadataCollector.updateMaxTimestamp(timestamp);
+    }
+
     /**
      * Attempt to close the index writer and data file before deleting all temp components for the sstable
      */
@@ -258,351 +263,6 @@ public class SSTableWriter extends SSTab
     {
         return dataFile.getFilePointer();
     }
-    
-    public static Builder createBuilder(Descriptor desc, OperationType type)
-    {
-        if (!desc.isLatestVersion)
-            // TODO: streaming between different versions will fail: need support for
-            // recovering other versions to provide a stable streaming api
-            throw new RuntimeException(String.format("Cannot recover SSTable %s due to version mismatch. (current version is %s).", desc.toString()
-                                                     , Descriptor.CURRENT_VERSION));
-
-        return new Builder(desc, type);
-    }
-
-    /**
-     * Removes the given SSTable from temporary status and opens it, rebuilding the
-     * bloom filter and row index from the data file.
-     *
-     * TODO remove this post-1.0, we have one-pass streaming now (see IncomingStreamReader)
-     */
-    public static class Builder implements CompactionInfo.Holder
-    {
-        private final Descriptor desc;
-        private final OperationType type;
-        private final ColumnFamilyStore cfs;
-        private RowIndexer indexer;
-
-        public Builder(Descriptor desc, OperationType type)
-        {
-            this.desc = desc;
-            this.type = type;
-            cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
-        }
-
-        public CompactionInfo getCompactionInfo()
-        {
-            maybeOpenIndexer();
-            try
-            {
-                // both file offsets are still valid post-close
-                return new CompactionInfo(desc.ksname,
-                                          desc.cfname,
-                                          CompactionType.SSTABLE_BUILD,
-                                          indexer.dfile.getFilePointer(),
-                                          indexer.dfile.length());
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-        }
-
-        // lazy-initialize the file to avoid opening it until it's actually executing on the CompactionManager,
-        // since the 8MB buffers can use up heap quickly
-        private void maybeOpenIndexer()
-        {
-            if (indexer != null)
-                return;
-            try
-            {
-                if (cfs.metadata.getDefaultValidator().isCommutative())
-                    indexer = new CommutativeRowIndexer(desc, cfs, type);
-                else
-                    indexer = new RowIndexer(desc, cfs, type);
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-        }
-
-        public SSTableReader build() throws IOException
-        {
-            try
-            {
-                if (cfs.isInvalid())
-                    return null;
-                maybeOpenIndexer();
-
-                File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
-                File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
-                assert !ifile.exists();
-                assert !ffile.exists();
-
-                long estimatedRows = indexer.prepareIndexing();
-
-                // build the index and filter
-                long rows = indexer.index();
-
-                logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
-                return SSTableReader.open(rename(desc, SSTable.componentsFor(desc, Descriptor.TempState.ANY)));
-            }
-            finally
-            {
-                cleanupIfNecessary();
-            }
-        }
-
-        /**
-        * Attempt to close the index writer before deleting all temp components for the sstable
-        */
-        public void cleanupIfNecessary()
-        {
-            FileUtils.closeQuietly(indexer);
-
-            try
-            {
-                Set<Component> components = SSTable.componentsFor(desc, Descriptor.TempState.TEMP);
-                if (!components.isEmpty())
-                    SSTable.delete(desc, components);
-            }
-            catch (Exception e)
-            {
-                logger.error(String.format("Failed deleting temp components for %s", desc), e);
-            }
-        }
-
-    }
-
-    static class RowIndexer implements Closeable
-    {
-        protected final Descriptor desc;
-        public final RandomAccessReader dfile;
-        private final OperationType type;
-
-        protected IndexWriter iwriter;
-        protected ColumnFamilyStore cfs;
-        protected final SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector();
-
-        RowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
-        {
-            this(desc, RandomAccessReader.open(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024, true), cfs, type);
-        }
-
-        protected RowIndexer(Descriptor desc, RandomAccessReader dfile, ColumnFamilyStore cfs, OperationType type) throws IOException
-        {
-            this.desc = desc;
-            this.dfile = dfile;
-            this.type = type;
-            this.cfs = cfs;
-        }
-
-        long prepareIndexing() throws IOException
-        {
-            long estimatedRows;
-            try
-            {
-                estimatedRows = SSTable.estimateRowsFromData(desc, dfile);
-                iwriter = new IndexWriter(desc, StorageService.getPartitioner(), estimatedRows);
-                return estimatedRows;
-            }
-            catch(IOException e)
-            {
-                dfile.close();
-                throw e;
-            }
-        }
-
-        long index() throws IOException
-        {
-            try
-            {
-                return doIndexing();
-            }
-            finally
-            {
-                try
-                {
-                    close();
-                }
-                catch (IOException e)
-                {
-                    throw new IOError(e);
-                }
-            }
-        }
-
-        public void close() throws IOException
-        {
-            dfile.close();
-            iwriter.close();
-        }
-
-        /*
-         * If the key is cached, we should:
-         *   - For AES: run the newly received row by the cache
-         *   - For other: invalidate the cache (even if very unlikely, a key could be in cache in theory if a neighbor was boostrapped and
-         *     then removed quickly afterward (a key that we had lost but become responsible again could have stayed in cache). That key
-         *     would be obsolete and so we must invalidate the cache).
-         */
-        protected void updateCache(DecoratedKey key, long dataSize, AbstractCompactedRow row) throws IOException
-        {
-            ColumnFamily cached = cfs.getRawCachedRow(key);
-            if (cached != null)
-            {
-                switch (type)
-                {
-                    case AES:
-                        if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
-                        {
-                            // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
-                            // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
-                            logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
-                            cfs.invalidateCachedRow(key);
-                        }
-                        else
-                        {
-                            ColumnFamily cf;
-                            if (row == null)
-                            {
-                                // If not provided, read from disk.
-                                long position = dfile.getFilePointer();
-                                cf = ColumnFamily.create(cfs.metadata);
-                                ColumnFamily.serializer().deserializeColumns(dfile, cf, true, true);
-                                dfile.seek(position);
-                            }
-                            else
-                            {
-                                assert row instanceof PrecompactedRow;
-                                // we do not purge so we should not get a null here
-                                cf = ((PrecompactedRow)row).getFullColumnFamily();
-                            }
-                            cfs.updateRowCache(key, cf);
-                        }
-                        break;
-                    default:
-                        cfs.invalidateCachedRow(key);
-                        break;
-                }
-            }
-        }
-
-        protected long doIndexing() throws IOException
-        {
-            long rows = 0;
-            DecoratedKey key;
-            long rowPosition = 0;
-            ColumnFamily cf = ColumnFamily.create(cfs.metadata);
-            while (rowPosition < dfile.length())
-            {
-                // read key
-                key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, ByteBufferUtil.readWithShortLength(dfile));
-                iwriter.afterAppend(key, rowPosition);
-
-                // seek to next key
-                long dataSize = SSTableReader.readRowSize(dfile, desc);
-                rowPosition = dfile.getFilePointer() + dataSize;
-
-                IndexHelper.skipBloomFilter(dfile);
-                IndexHelper.skipIndex(dfile);
-                ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, dfile);
-
-                // We can't simply get the max column timestamp here by calling cf.maxTimestamp() because
-                // the columns have not been deserialized yet. observeColumnsInSSTable() will deserialize
-                // and get the max timestamp instead.
-                ColumnFamily.serializer().observeColumnsInSSTable(cfs.metadata, dfile, sstableMetadataCollector);
-
-                // don't move that statement around, it expects the dfile to be before the columns
-                updateCache(key, dataSize, null);
-
-                sstableMetadataCollector.addRowSize(dataSize);
-                
-                dfile.seek(rowPosition);
-
-                rows++;
-            }
-            writeMetadata(desc, sstableMetadataCollector.finalizeMetadata());
-            return rows;
-        }
-
-        public String toString()
-        {
-            return "RowIndexer(" + desc + ")";
-        }
-    }
-
-    /*
-     * When a sstable for a counter column family is streamed, we must ensure
-     * that on the receiving node all counter column goes through the
-     * deserialization from remote code path (i.e, it must be cleared from its
-     * delta) to maintain the invariant that on a given node, only increments
-     * that the node originated are delta (and copy of those must not be delta).
-     *
-     * Since after streaming row indexation goes through every streamed
-     * sstable, we use this opportunity to ensure this property. This is the
-     * goal of this specific CommutativeRowIndexer.
-     */
-    static class CommutativeRowIndexer extends RowIndexer
-    {
-        protected SequentialWriter writerDfile;
-
-        CommutativeRowIndexer(Descriptor desc, ColumnFamilyStore cfs, OperationType type) throws IOException
-        {
-            super(desc, RandomAccessReader.open(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024, true), cfs, type);
-            writerDfile = SequentialWriter.open(new File(desc.filenameFor(SSTable.COMPONENT_DATA)), 8 * 1024 * 1024, true);
-        }
-
-        @Override
-        protected long doIndexing() throws IOException
-        {
-            long rows = 0L;
-            DecoratedKey key;
-
-            CompactionController controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
-            while (!dfile.isEOF())
-            {
-                // read key
-                key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, ByteBufferUtil.readWithShortLength(dfile));
-
-                // skip data size, bloom filter, column index
-                long dataSize = SSTableReader.readRowSize(dfile, desc);
-                SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, dfile, key, dfile.getFilePointer(), dataSize, true);
-
-                AbstractCompactedRow row = controller.getCompactedRow(iter);
-                updateCache(key, dataSize, row);
-
-                sstableMetadataCollector.addRowSize(dataSize);
-                sstableMetadataCollector.addColumnCount(row.columnCount());
-                sstableMetadataCollector.updateMaxTimestamp(row.maxTimestamp());
-
-                // update index writer
-                iwriter.afterAppend(key, writerDfile.getFilePointer());
-                // write key and row
-                ByteBufferUtil.writeWithShortLength(key.key, writerDfile.stream);
-                row.write(writerDfile.stream);
-
-                rows++;
-            }
-            writeMetadata(desc, sstableMetadataCollector.finalizeMetadata());
-
-            if (writerDfile.getFilePointer() != dfile.getFilePointer())
-            {
-                // truncate file to new, reduced length
-                writerDfile.truncate(writerDfile.getFilePointer());
-            }
-            writerDfile.sync();
-
-            return rows;
-        }
-
-        @Override
-        public void close() throws IOException
-        {
-            super.close();
-            writerDfile.close();
-        }
-    }
 
     /**
      * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Jul 26 04:30:43 2011
@@ -28,9 +28,6 @@ import org.apache.cassandra.gms.Gossiper
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.security.streaming.SSLIncomingStreamReader;
 import org.apache.cassandra.streaming.IncomingStreamReader;
 import org.apache.cassandra.streaming.StreamHeader;
 
@@ -171,9 +168,6 @@ public class IncomingTcpConnection exten
 
     private void stream(StreamHeader streamHeader, DataInputStream input) throws IOException
     {
-        if (DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all)
-            new SSLIncomingStreamReader(streamHeader, socket, input).read();
-        else
-            new IncomingStreamReader(streamHeader, socket).read();
+        new IncomingStreamReader(streamHeader, socket).read();
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/security/streaming/SSLIncomingStreamReader.java Tue Jul 26 04:30:43 2011
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.security.streaming;
-
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.io.IOException;
-import java.io.DataInputStream;
-
-import org.apache.cassandra.streaming.FileStreamTask;
-import org.apache.cassandra.streaming.IncomingStreamReader;
-import org.apache.cassandra.streaming.StreamHeader;
-
-/**
- * This class uses a DataInputStream to read data as opposed to a FileChannel.transferFrom
- * used by IncomingStreamReader because the underlying SSLServerSocket doesn't support
- * encrypting over NIO SocketChannel.
- */
-public class SSLIncomingStreamReader extends IncomingStreamReader
-{
-    private final DataInputStream input;
-
-    public SSLIncomingStreamReader(StreamHeader header, Socket socket, DataInputStream input) throws IOException
-    {
-        super(header, socket);
-        this.input = input;
-    }
-
-    @Override
-    protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
-    {
-        int toRead = (int)Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
-        ByteBuffer buf = ByteBuffer.allocate(toRead);
-        input.readFully(buf.array());
-        fc.write(buf);
-        bytesRead += buf.limit();
-        remoteFile.progress += buf.limit();
-        return bytesRead;
-    }
-}

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Tue Jul 26 04:30:43 2011
@@ -21,7 +21,6 @@ package org.apache.cassandra.streaming;
 import java.io.*;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Collections;
 
@@ -52,12 +51,12 @@ public class IncomingStreamReader
 
     protected final PendingFile localFile;
     protected final PendingFile remoteFile;
-    private final SocketChannel socketChannel;
     protected final StreamInSession session;
+    private final Socket socket;
 
     public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
     {
-        this.socketChannel = socket.getChannel();
+        this.socket = socket;
         InetSocketAddress remoteAddress = (InetSocketAddress)socket.getRemoteSocketAddress();
         session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId);
         session.addFiles(header.pendingFiles);
@@ -72,26 +71,19 @@ public class IncomingStreamReader
     public void read() throws IOException
     {
         if (remoteFile != null)
-            readFile();
-
-        session.closeIfFinished();
-    }
-
-    protected void readFile() throws IOException
-    {
-        if (logger.isDebugEnabled())
         {
-            logger.debug("Receiving stream");
-            logger.debug("Creating file for {} with {} estimated keys",
-                         localFile.getFilename(),
-                         remoteFile.estimatedKeys);
-        }
+            if (logger.isDebugEnabled())
+            {
+                logger.debug("Receiving stream");
+                logger.debug("Creating file for {} with {} estimated keys",
+                             localFile.getFilename(),
+                             remoteFile.estimatedKeys);
+            }
 
-        SSTableReader reader = null;
-        if (remoteFile.estimatedKeys > 0)
-        {
+            assert remoteFile.estimatedKeys > 0;
+            SSTableReader reader = null;
             logger.debug("Estimated keys {}", remoteFile.estimatedKeys);
-            DataInputStream dis = new DataInputStream(socketChannel.socket().getInputStream());
+            DataInputStream dis = new DataInputStream(socket.getInputStream());
             try
             {
                 reader = streamIn(dis, localFile, remoteFile);
@@ -105,53 +97,11 @@ public class IncomingStreamReader
             {
                 dis.close();
             }
-        }
-        else
-        {
-            // backwards compatibility path
-            FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
-            FileChannel fc = fos.getChannel();
 
-            long offset = 0;
-            try
-            {
-                for (Pair<Long, Long> section : localFile.sections)
-                {
-                    long length = section.right - section.left;
-                    long bytesRead = 0;
-                    while (bytesRead < length)
-                    {
-                        bytesRead = readnwrite(length, bytesRead, offset, fc);
-                    }
-                    offset += length;
-                }
-            }
-            catch (IOException ex)
-            {
-                retry();
-                throw ex;
-            }
-            finally
-            {
-                fc.close();
-            }
+            session.finished(remoteFile, reader);
         }
 
-        session.finished(remoteFile, localFile, reader);
-    }
-
-    protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
-    {
-        long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
-        long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead);
-        // if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read
-        // and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and 
-        // raise an exception (that will be catch belove and 'the right thing' will be done).
-        if (lastRead == 0)
-            throw new IOException("Transfer failed for remote file " + remoteFile);
-        bytesRead += lastRead;
-        remoteFile.progress += lastRead;
-        return bytesRead;
+        session.closeIfFinished();
     }
 
     private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException
@@ -183,6 +133,8 @@ public class IncomingStreamReader
                         SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
                         AbstractCompactedRow row = controller.getCompactedRow(iter);
                         writer.append(row);
+                        // row append does not update the max timestamp on its own
+                        writer.updateMaxTimestamp(row.maxTimestamp());
 
                         if (row instanceof PrecompactedRow)
                         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Tue Jul 26 04:30:43 2011
@@ -50,7 +50,6 @@ public class StreamInSession
     private final Pair<InetAddress, Long> context;
     private final Runnable callback;
     private String table;
-    private final Collection<Future<SSTableReader>> buildFutures = new LinkedBlockingQueue<Future<SSTableReader>>();
     private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
     private PendingFile current;
 
@@ -103,22 +102,13 @@ public class StreamInSession
         }
     }
 
-    public void finished(PendingFile remoteFile, PendingFile localFile, SSTableReader reader) throws IOException
+    public void finished(PendingFile remoteFile, SSTableReader reader) throws IOException
     {
         if (logger.isDebugEnabled())
             logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
 
-        if (reader != null)
-        {
-            // SSTR was already built during streaming
-            readers.add(reader);
-        }
-        else
-        {
-            Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
-            buildFutures.add(future);
-        }
-
+        assert reader != null;
+        readers.add(reader);
         files.remove(remoteFile);
         if (remoteFile.equals(current))
             current = null;
@@ -143,33 +133,6 @@ public class StreamInSession
             List<SSTableReader> referenced = new LinkedList<SSTableReader>();
             try
             {
-                for (Future<SSTableReader> future : buildFutures)
-                {
-                    try
-                    {
-                        SSTableReader sstable = future.get();
-                        assert sstable.getTableName().equals(table);
-
-                        // Acquiring the reference (for secondary index building) before adding it makes sure we don't have to care about races
-                        sstable.acquireReference();
-                        referenced.add(sstable);
-
-                        ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
-                        cfs.addSSTable(sstable);
-                        if (!cfstores.containsKey(cfs))
-                            cfstores.put(cfs, new ArrayList<SSTableReader>());
-                        cfstores.get(cfs).add(sstable);
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new AssertionError(e);
-                    }
-                    catch (ExecutionException e)
-                    {
-                        throw new RuntimeException(e);
-                    }
-                }
-
                 for (SSTableReader sstable : readers)
                 {
                     assert sstable.getTableName().equals(table);

Modified: cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java (original)
+++ cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java Tue Jul 26 04:30:43 2011
@@ -73,33 +73,6 @@ public class LongCompactionSpeedTest ext
         testCompaction(100, 800, 5);
     }
 
-    /**
-     * Test aes counter repair with a very wide row.
-     */
-    @Test
-    public void testAESCountersRepairWide() throws Exception
-    {
-        testAESCountersRepair(2, 1, 500000);
-    }
-
-    /**
-     * Test aes counter repair with lots of skinny rows.
-     */
-    @Test
-    public void testAESCountersRepairSlim() throws Exception
-    {
-        testAESCountersRepair(2, 500000, 1);
-    }
-
-    /**
-     * Test aes counter repair with lots of small sstables.
-     */
-    @Test
-    public void testAESCounterRepairMany() throws Exception
-    {
-        testAESCountersRepair(100, 1000, 5);
-    }
-
     protected void testCompaction(int sstableCount, int rowsPerSSTable, int colsPerRow) throws Exception
     {
         CompactionManager.instance.disableAutoCompaction();
@@ -140,64 +113,4 @@ public class LongCompactionSpeedTest ext
                                          colsPerRow,
                                          System.currentTimeMillis() - start));
     }
-
-    protected void testAESCountersRepair(int sstableCount, final int rowsPerSSTable, final int colsPerRow) throws Exception
-    {
-        final String cfName = "Counter1";
-        CompactionManager.instance.disableAutoCompaction();
-
-        ArrayList<SSTableReader> sstables = new ArrayList<SSTableReader>();
-        for (int k = 0; k < sstableCount; k++)
-        {
-            final int sstableNum = k;
-            SSTableReader sstable = SSTableUtils.prepare().ks(TABLE1).cf(cfName).write(rowsPerSSTable, new SSTableUtils.Appender(){
-                int written = 0;
-                public boolean append(SSTableWriter writer) throws IOException
-                {
-                    if (written > rowsPerSSTable)
-                        return false;
-
-                    DecoratedKey key = Util.dk(String.format("%020d", written));
-                    ColumnFamily cf = ColumnFamily.create(TABLE1, cfName);
-                    for (int i = 0; i < colsPerRow; i++)
-                        cf.addColumn(createCounterColumn(String.valueOf(i)));
-                    writer.append(key, cf);
-                    written++;
-                    return true;
-                }
-            });
-
-            // whack the index to trigger the recover
-            FileUtils.deleteWithConfirm(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
-            FileUtils.deleteWithConfirm(sstable.descriptor.filenameFor(Component.FILTER));
-
-            sstables.add(sstable);
-        }
-
-        // give garbage collection a bit of time to catch up
-        Thread.sleep(1000);
-
-        long start = System.currentTimeMillis();
-
-        for (SSTableReader sstable : sstables)
-            CompactionManager.instance.submitSSTableBuild(sstable.descriptor, OperationType.AES).get();
-
-        System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
-                                         this.getClass().getName(),
-                                         sstableCount,
-                                         rowsPerSSTable,
-                                         colsPerRow,
-                                         System.currentTimeMillis() - start));
-    }
-
-    protected CounterColumn createCounterColumn(String name)
-    {
-        ContextState context = ContextState.allocate(4, 1);
-        context.writeElement(NodeId.fromInt(1), 4L, 2L, true);
-        context.writeElement(NodeId.fromInt(2), 4L, 2L);
-        context.writeElement(NodeId.fromInt(4), 3L, 3L);
-        context.writeElement(NodeId.fromInt(8), 2L, 4L);
-
-        return new CounterColumn(ByteBufferUtil.bytes(name), context.context, 0L);
-    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java Tue Jul 26 04:30:43 2011
@@ -31,10 +31,12 @@ import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import org.apache.cassandra.Util;
+import static org.junit.Assert.assertEquals;
 
 public class SSTableUtils
 {
@@ -74,6 +76,48 @@ public class SSTableUtils
         return datafile;
     }
 
+    public static void assertContentEquals(SSTableReader lhs, SSTableReader rhs) throws IOException
+    {
+        SSTableScanner slhs = lhs.getDirectScanner(2048);
+        SSTableScanner srhs = rhs.getDirectScanner(2048);
+        while (slhs.hasNext())
+        {
+            IColumnIterator ilhs = slhs.next();
+            assert srhs.hasNext() : "LHS contained more rows than RHS";
+            IColumnIterator irhs = srhs.next();
+            assertContentEquals(ilhs, irhs);
+        }
+        assert !srhs.hasNext() : "RHS contained more rows than LHS";
+    }
+
+    public static void assertContentEquals(IColumnIterator lhs, IColumnIterator rhs) throws IOException
+    {
+        assertEquals(lhs.getKey(), rhs.getKey());
+        // check metadata
+        ColumnFamily lcf = lhs.getColumnFamily();
+        ColumnFamily rcf = rhs.getColumnFamily();
+        if (lcf == null)
+        {
+            if (rcf == null)
+                return;
+            throw new AssertionError("LHS had no content for " + rhs.getKey());
+        }
+        else if (rcf == null)
+            throw new AssertionError("RHS had no content for " + lhs.getKey());
+        assertEquals(lcf.getMarkedForDeleteAt(), rcf.getMarkedForDeleteAt());
+        assertEquals(lcf.getLocalDeletionTime(), rcf.getLocalDeletionTime());
+        // iterate columns
+        while (lhs.hasNext())
+        {
+            IColumn clhs = lhs.next();
+            assert rhs.hasNext() : "LHS contained more columns than RHS for " + lhs.getKey();
+            IColumn crhs = rhs.next();
+
+            assertEquals("Mismatched columns for " + lhs.getKey(), clhs, crhs);
+        }
+        assert !rhs.hasNext() : "RHS contained more columns than LHS for " + lhs.getKey();
+    }
+
     /**
      * @return A Context with chainable methods to configure and write a SSTable.
      */
@@ -190,6 +234,7 @@ public class SSTableUtils
             long start = System.currentTimeMillis();
             while (appender.append(writer)) { /* pass */ }
             SSTableReader reader = writer.closeAndOpenReader();
+            reader.acquireReference();
             // mark all components for removal
             if (cleanup)
                 for (Component component : reader.components)

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1150984&r1=1150983&r2=1150984&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Tue Jul 26 04:30:43 2011
@@ -20,7 +20,9 @@ package org.apache.cassandra.streaming;
 */
 
 import static junit.framework.Assert.assertEquals;
+import org.apache.cassandra.Util;
 import static org.apache.cassandra.Util.column;
+import static org.apache.cassandra.Util.addMutation;
 
 import java.net.InetAddress;
 import java.util.*;
@@ -29,6 +31,7 @@ import org.apache.cassandra.CleanupHelpe
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
@@ -41,6 +44,7 @@ import org.apache.cassandra.thrift.Index
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NodeId;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -56,32 +60,26 @@ public class StreamingTransferTest exten
         StorageService.instance.initServer();
     }
 
-    @Test
-    public void testTransferTable() throws Exception
+    /**
+     * Create and transfer a single sstable, and return the keys that should have been transferred.
+     * The Mutator must create the given column, but it may also create any other columns it pleases.
+     */
+    private List<String> createAndTransfer(Table table, ColumnFamilyStore cfs, Mutator mutator) throws Exception
     {
-        Table table = Table.open("Keyspace1");
-        ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1");
-        
         // write a temporary SSTable, and unregister it
+        long timestamp = 1234;
         for (int i = 1; i <= 3; i++)
-        {
-            String key = "key" + i;
-            String col = "col" + i;
-            RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key));
-            ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily);
-            cf.addColumn(column(col, "v", 0));
-            cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes((long) i), 0));
-            rm.add(cf);
-            rm.apply();
-        }
+            mutator.mutate("key" + i, "col" + i, timestamp);
         cfs.forceBlockingFlush();
-        assert cfs.getSSTables().size() == 1;
+        Util.compactAll(cfs).get();
+        assertEquals(1, cfs.getSSTables().size());
         SSTableReader sstable = cfs.getSSTables().iterator().next();
         // We acquire a reference now, because removeAllSSTables will mark the sstable compacted, and we have work to do with it
         sstable.acquireReference();
         cfs.removeAllSSTables();
 
         // transfer the first and last key
+        int[] offs = new int[]{1, 3};
         IPartitioner p = StorageService.getPartitioner();
         List<Range> ranges = new ArrayList<Range>();
         ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
@@ -90,27 +88,133 @@ public class StreamingTransferTest exten
         StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
         session.await();
 
-        // confirm that the SSTable was transferred and registered
-        List<Row> rows = Util.getRangeSlice(cfs);
-        assertEquals(2, rows.size());
-        assert rows.get(0).key.key.equals( ByteBufferUtil.bytes("key1"));
-        assert rows.get(1).key.key.equals( ByteBufferUtil.bytes("key3"));
-        assertEquals(2, rows.get(0).cf.getColumnsMap().size());
-        assertEquals(2, rows.get(1).cf.getColumnsMap().size());
-        assert rows.get(1).cf.getColumn(ByteBufferUtil.bytes("col3")) != null;
+        // confirm that a single SSTable was transferred and registered
+        assertEquals(1, cfs.getSSTables().size());
 
         // and that the index and filter were properly recovered
-        assert null != cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key1"), new QueryPath(cfs.columnFamily)));
-        assert null != cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key3"), new QueryPath(cfs.columnFamily)));
+        List<Row> rows = Util.getRangeSlice(cfs);
+        assertEquals(offs.length, rows.size());
+        for (int i = 0; i < offs.length; i++)
+        {
+            String key = "key" + offs[i];
+            String col = "col" + offs[i];
+            assert null != cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk(key),
+                                               new QueryPath(cfs.columnFamily)));
+            assert rows.get(i).key.key.equals(ByteBufferUtil.bytes(key));
+            assert rows.get(i).cf.getColumn(ByteBufferUtil.bytes(col)) != null;
+        }
 
-        // and that the secondary index works
-        IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, ByteBufferUtil.bytes(3L));
-        IndexClause clause = new IndexClause(Arrays.asList(expr), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100);
-        IFilter filter = new IdentityQueryFilter();
-        Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
-        rows = cfs.scan(clause, range, filter);
-        assertEquals(1, rows.size());
-        assert rows.get(0).key.key.equals( ByteBufferUtil.bytes("key3")) ;
+        // and that the max timestamp for the file was rediscovered
+        assertEquals(timestamp, cfs.getSSTables().iterator().next().getMaxTimestamp());
+        
+        List<String> keys = new ArrayList<String>();
+        for (int off : offs)
+            keys.add("key" + off);
+        return keys;
+    }
+
+    @Test
+    public void testTransferTable() throws Exception
+    {
+        final Table table = Table.open("Keyspace1");
+        final ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1");
+        
+        List<String> keys = createAndTransfer(table, cfs, new Mutator()
+        {
+            public void mutate(String key, String col, long timestamp) throws Exception
+            {
+                long val = key.hashCode();
+                RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes(key));
+                ColumnFamily cf = ColumnFamily.create(table.name, cfs.columnFamily);
+                cf.addColumn(column(col, "v", timestamp));
+                cf.addColumn(new Column(ByteBufferUtil.bytes("birthdate"), ByteBufferUtil.bytes(val), timestamp));
+                rm.add(cf);
+                rm.apply();
+            }
+        });
+
+        // confirm that the secondary index was recovered
+        for (String key : keys)
+        {
+            long val = key.hashCode();
+            IPartitioner p = StorageService.getPartitioner();
+            IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"),
+                                                       IndexOperator.EQ,
+                                                       ByteBufferUtil.bytes(val));
+            IndexClause clause = new IndexClause(Arrays.asList(expr), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100);
+            IFilter filter = new IdentityQueryFilter();
+            Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+            List<Row> rows = cfs.scan(clause, range, filter);
+            assertEquals(1, rows.size());
+            assert rows.get(0).key.key.equals(ByteBufferUtil.bytes(key));
+        }
+    }
+
+    @Test
+    public void testTransferTableSuper() throws Exception
+    {
+        final Table table = Table.open("Keyspace1");
+        final ColumnFamilyStore cfs = table.getColumnFamilyStore("Super1");
+        
+        createAndTransfer(table, cfs, new Mutator()
+        {
+            public void mutate(String key, String col, long timestamp) throws Exception
+            {
+                RowMutation rm = new RowMutation(table.name, ByteBufferUtil.bytes(key));
+                addMutation(rm, cfs.columnFamily, col, 1, "val1", timestamp);
+                rm.apply();
+            }
+        });
+    }
+
+    @Test
+    public void testTransferTableCounter() throws Exception
+    {
+        final Table table = Table.open("Keyspace1");
+        final ColumnFamilyStore cfs = table.getColumnFamilyStore("Counter1");
+        final CounterContext cc = new CounterContext();
+        
+        final Map<String, ColumnFamily> cleanedEntries = new HashMap<String, ColumnFamily>();
+
+        List<String> keys = createAndTransfer(table, cfs, new Mutator()
+        {
+            /** Creates a new SSTable per key: all will be merged before streaming. */
+            public void mutate(String key, String col, long timestamp) throws Exception
+            {
+                Map<String, ColumnFamily> entries = new HashMap<String, ColumnFamily>();
+                ColumnFamily cf = ColumnFamily.create(cfs.metadata);
+                ColumnFamily cfCleaned = ColumnFamily.create(cfs.metadata);
+                CounterContext.ContextState state = CounterContext.ContextState.allocate(4, 1);
+                state.writeElement(NodeId.fromInt(2), 9L, 3L, true);
+                state.writeElement(NodeId.fromInt(4), 4L, 2L);
+                state.writeElement(NodeId.fromInt(6), 3L, 3L);
+                state.writeElement(NodeId.fromInt(8), 2L, 4L);
+                cf.addColumn(new CounterColumn(ByteBufferUtil.bytes(col),
+                                               state.context,
+                                               timestamp));
+                cfCleaned.addColumn(new CounterColumn(ByteBufferUtil.bytes(col),
+                                                      cc.clearAllDelta(state.context),
+                                                      timestamp));
+
+                entries.put(key, cf);
+                cleanedEntries.put(key, cfCleaned);
+                cfs.addSSTable(SSTableUtils.prepare()
+                    .ks(table.name)
+                    .cf(cfs.columnFamily)
+                    .generation(0)
+                    .write(entries));
+            }
+        });
+
+        // filter pre-cleaned entries locally, and ensure that the end result is equal
+        cleanedEntries.keySet().retainAll(keys);
+        SSTableReader cleaned = SSTableUtils.prepare()
+            .ks(table.name)
+            .cf(cfs.columnFamily)
+            .generation(0)
+            .write(cleanedEntries);
+        SSTableReader streamed = cfs.getSSTables().iterator().next();
+        SSTableUtils.assertContentEquals(cleaned, streamed);
     }
 
     @Test
@@ -208,4 +312,9 @@ public class StreamingTransferTest exten
             assertEquals(entry.getKey(), rows.get(0).key);
         }
     }
+ 
+    public interface Mutator
+    {
+        public void mutate(String key, String col, long timestamp) throws Exception;
+    }
 }