You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/20 17:07:10 UTC

svn commit: r1148811 - /cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java

Author: jbellis
Date: Wed Jul 20 15:07:09 2011
New Revision: 1148811

URL: http://svn.apache.org/viewvc?rev=1148811&view=rev
Log:
add cleanupIfNecessary for single-pass streaming SSTable build
patch by Yuki Morishita; reviewed by stuhood and jbellis for CASSANDRA-2906

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1148811&r1=1148810&r2=1148811&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed Jul 20 15:07:09 2011
@@ -161,80 +161,87 @@ public class IncomingStreamReader
         SSTableWriter writer = new SSTableWriter(localFile.getFilename(), remoteFile.estimatedKeys);
         CompactionController controller = null;
 
-        BytesReadTracker in = new BytesReadTracker(input);
-
-        for (Pair<Long, Long> section : localFile.sections)
+        try
         {
-            long length = section.right - section.left;
-            long bytesRead = 0;
-            while (bytesRead < length)
+            BytesReadTracker in = new BytesReadTracker(input);
+
+            for (Pair<Long, Long> section : localFile.sections)
             {
-                in.reset();
-                key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in));
-                long dataSize = SSTableReader.readRowSize(in, localFile.desc);
-                ColumnFamily cf = null;
-                if (cfs.metadata.getDefaultValidator().isCommutative())
+                long length = section.right - section.left;
+                long bytesRead = 0;
+                while (bytesRead < length)
                 {
-                    // take care of counter column family
-                    if (controller == null)
-                        controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
-                    SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
-                    AbstractCompactedRow row = controller.getCompactedRow(iter);
-                    writer.append(row);
-
-                    if (row instanceof PrecompactedRow)
+                    in.reset();
+                    key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in));
+                    long dataSize = SSTableReader.readRowSize(in, localFile.desc);
+                    ColumnFamily cf = null;
+                    if (cfs.metadata.getDefaultValidator().isCommutative())
                     {
-                        // we do not purge so we should not get a null here
-                        cf = ((PrecompactedRow)row).getFullColumnFamily();
+                        // take care of counter column family
+                        if (controller == null)
+                            controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
+                        SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
+                        AbstractCompactedRow row = controller.getCompactedRow(iter);
+                        writer.append(row);
+
+                        if (row instanceof PrecompactedRow)
+                        {
+                            // we do not purge so we should not get a null here
+                            cf = ((PrecompactedRow)row).getFullColumnFamily();
+                        }
                     }
-                }
-                else
-                {
-                    // skip BloomFilter
-                    IndexHelper.skipBloomFilter(in);
-                    // skip Index
-                    IndexHelper.skipIndex(in);
-
-                    // restore ColumnFamily
-                    cf = ColumnFamily.create(cfs.metadata);
-                    ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in);
-                    ColumnFamily.serializer().deserializeColumns(in, cf, true, true);
+                    else
+                    {
+                        // skip BloomFilter
+                        IndexHelper.skipBloomFilter(in);
+                        // skip Index
+                        IndexHelper.skipIndex(in);
+
+                        // restore ColumnFamily
+                        cf = ColumnFamily.create(cfs.metadata);
+                        ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in);
+                        ColumnFamily.serializer().deserializeColumns(in, cf, true, true);
 
-                    // write key and cf
-                    writer.append(key, cf);
-                }
+                        // write key and cf
+                        writer.append(key, cf);
+                    }
 
-                // update cache
-                ColumnFamily cached = cfs.getRawCachedRow(key);
-                if (cached != null)
-                {
-                    switch (remoteFile.type)
+                    // update cache
+                    ColumnFamily cached = cfs.getRawCachedRow(key);
+                    if (cached != null)
                     {
-                        case AES:
-                            if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
-                            {
-                                // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
-                                // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
-                                logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
+                        switch (remoteFile.type)
+                        {
+                            case AES:
+                                if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
+                                {
+                                    // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
+                                    // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
+                                    logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
+                                    cfs.invalidateCachedRow(key);
+                                }
+                                else
+                                {
+                                    assert cf != null;
+                                    cfs.updateRowCache(key, cf);
+                                }
+                                break;
+                            default:
                                 cfs.invalidateCachedRow(key);
-                            }
-                            else
-                            {
-                                assert cf != null;
-                                cfs.updateRowCache(key, cf);
-                            }
-                            break;
-                        default:
-                            cfs.invalidateCachedRow(key);
-                            break;
+                                break;
+                        }
                     }
-                }
 
-                bytesRead += in.getBytesRead();
-                remoteFile.progress += in.getBytesRead();
+                    bytesRead += in.getBytesRead();
+                    remoteFile.progress += in.getBytesRead();
+                }
             }
+            return writer.closeAndOpenReader();
+        }
+        finally
+        {
+            writer.cleanupIfNecessary();
         }
-        return writer.closeAndOpenReader();
     }
 
     private void retry() throws IOException