You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/20 17:07:10 UTC
svn commit: r1148811 -
/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Author: jbellis
Date: Wed Jul 20 15:07:09 2011
New Revision: 1148811
URL: http://svn.apache.org/viewvc?rev=1148811&view=rev
Log:
add cleanupIfNecessary for single-pass streaming SSTable build
patch by Yuki Morishita; reviewed by stuhood and jbellis for CASSANDRA-2906
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1148811&r1=1148810&r2=1148811&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed Jul 20 15:07:09 2011
@@ -161,80 +161,87 @@ public class IncomingStreamReader
SSTableWriter writer = new SSTableWriter(localFile.getFilename(), remoteFile.estimatedKeys);
CompactionController controller = null;
- BytesReadTracker in = new BytesReadTracker(input);
-
- for (Pair<Long, Long> section : localFile.sections)
+ try
{
- long length = section.right - section.left;
- long bytesRead = 0;
- while (bytesRead < length)
+ BytesReadTracker in = new BytesReadTracker(input);
+
+ for (Pair<Long, Long> section : localFile.sections)
{
- in.reset();
- key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in));
- long dataSize = SSTableReader.readRowSize(in, localFile.desc);
- ColumnFamily cf = null;
- if (cfs.metadata.getDefaultValidator().isCommutative())
+ long length = section.right - section.left;
+ long bytesRead = 0;
+ while (bytesRead < length)
{
- // take care of counter column family
- if (controller == null)
- controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
- SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
- AbstractCompactedRow row = controller.getCompactedRow(iter);
- writer.append(row);
-
- if (row instanceof PrecompactedRow)
+ in.reset();
+ key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in));
+ long dataSize = SSTableReader.readRowSize(in, localFile.desc);
+ ColumnFamily cf = null;
+ if (cfs.metadata.getDefaultValidator().isCommutative())
{
- // we do not purge so we should not get a null here
- cf = ((PrecompactedRow)row).getFullColumnFamily();
+ // take care of counter column family
+ if (controller == null)
+ controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
+ SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
+ AbstractCompactedRow row = controller.getCompactedRow(iter);
+ writer.append(row);
+
+ if (row instanceof PrecompactedRow)
+ {
+ // we do not purge so we should not get a null here
+ cf = ((PrecompactedRow)row).getFullColumnFamily();
+ }
}
- }
- else
- {
- // skip BloomFilter
- IndexHelper.skipBloomFilter(in);
- // skip Index
- IndexHelper.skipIndex(in);
-
- // restore ColumnFamily
- cf = ColumnFamily.create(cfs.metadata);
- ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in);
- ColumnFamily.serializer().deserializeColumns(in, cf, true, true);
+ else
+ {
+ // skip BloomFilter
+ IndexHelper.skipBloomFilter(in);
+ // skip Index
+ IndexHelper.skipIndex(in);
+
+ // restore ColumnFamily
+ cf = ColumnFamily.create(cfs.metadata);
+ ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in);
+ ColumnFamily.serializer().deserializeColumns(in, cf, true, true);
- // write key and cf
- writer.append(key, cf);
- }
+ // write key and cf
+ writer.append(key, cf);
+ }
- // update cache
- ColumnFamily cached = cfs.getRawCachedRow(key);
- if (cached != null)
- {
- switch (remoteFile.type)
+ // update cache
+ ColumnFamily cached = cfs.getRawCachedRow(key);
+ if (cached != null)
{
- case AES:
- if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
- {
- // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
- // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
- logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
+ switch (remoteFile.type)
+ {
+ case AES:
+ if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
+ {
+ // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
+ // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
+ logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
+ cfs.invalidateCachedRow(key);
+ }
+ else
+ {
+ assert cf != null;
+ cfs.updateRowCache(key, cf);
+ }
+ break;
+ default:
cfs.invalidateCachedRow(key);
- }
- else
- {
- assert cf != null;
- cfs.updateRowCache(key, cf);
- }
- break;
- default:
- cfs.invalidateCachedRow(key);
- break;
+ break;
+ }
}
- }
- bytesRead += in.getBytesRead();
- remoteFile.progress += in.getBytesRead();
+ bytesRead += in.getBytesRead();
+ remoteFile.progress += in.getBytesRead();
+ }
}
+ return writer.closeAndOpenReader();
+ }
+ finally
+ {
+ writer.cleanupIfNecessary();
}
- return writer.closeAndOpenReader();
}
private void retry() throws IOException