You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/13 18:24:31 UTC
svn commit: r1146117 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/streaming/
Author: jbellis
Date: Wed Jul 13 16:24:31 2011
New Revision: 1146117
URL: http://svn.apache.org/viewvc?rev=1146117&view=rev
Log:
single-pass streaming
patch by Yuki Morishita; reviewed by jbellis for CASSANDRA-2677
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jul 13 16:24:31 2011
@@ -12,6 +12,7 @@
* don't bother persisting columns shadowed by a row tombstone (CASSANDRA-2589)
* reset CF and SC deletion times after gc_grace (CASSANDRA-2317)
* optimize away seek when compacting wide rows (CASSANDRA-2879)
+ * single-pass streaming (CASSANDRA-2677)
0.8.2
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Wed Jul 13 16:24:31 2011
@@ -131,6 +131,12 @@ public class ColumnFamilySerializer impl
public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean intern, boolean fromRemote) throws IOException
{
int size = dis.readInt();
+ deserializeColumns(dis, cf, size, intern, fromRemote);
+ }
+
+ /* column count is already read from DataInput */
+ public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, boolean intern, boolean fromRemote) throws IOException
+ {
ColumnFamilyStore interner = intern ? Table.open(CFMetaData.getCF(cf.id()).left).getColumnFamilyStore(cf.id()) : null;
for (int i = 0; i < size; ++i)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Wed Jul 13 16:24:31 2011
@@ -42,27 +42,45 @@ public class IndexHelper
* @param in the data input from which the bloom filter should be skipped
* @throws IOException
*/
- public static void skipBloomFilter(FileDataInput in) throws IOException
+ public static void skipBloomFilter(DataInput in) throws IOException
{
/* size of the bloom filter */
int size = in.readInt();
/* skip the serialized bloom filter */
- FileUtils.skipBytesFully(in, size);
+ if (in instanceof FileDataInput)
+ {
+ FileUtils.skipBytesFully(in, size);
+ }
+ else
+ {
+ // skip bytes
+ byte[] skip = new byte[size];
+ in.readFully(skip);
+ }
}
- /**
- * Skip the index
- * @param file the data input from which the index should be skipped
- * @throws IOException if an I/O error occurs.
- */
- public static void skipIndex(FileDataInput file) throws IOException
- {
+ /**
+ * Skip the index
+ * @param in the data input from which the index should be skipped
+ * @throws IOException if an I/O error occurs.
+ */
+ public static void skipIndex(DataInput in) throws IOException
+ {
/* read only the column index list */
- int columnIndexSize = file.readInt();
+ int columnIndexSize = in.readInt();
/* skip the column index data */
- FileUtils.skipBytesFully(file, columnIndexSize);
- }
-
+ if (in instanceof FileDataInput)
+ {
+ FileUtils.skipBytesFully(in, columnIndexSize);
+ }
+ else
+ {
+ // skip bytes
+ byte[] skip = new byte[columnIndexSize];
+ in.readFully(skip);
+ }
+ }
+
/**
* Deserialize the index into a structure and return it
*
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Wed Jul 13 16:24:31 2011
@@ -21,11 +21,11 @@ package org.apache.cassandra.io.sstable;
*/
+import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOError;
import java.io.IOException;
-import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +37,7 @@ import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.utils.Filter;
+import org.apache.cassandra.utils.BytesReadTracker;
public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
{
@@ -45,14 +45,16 @@ public class SSTableIdentityIterator imp
private final DecoratedKey key;
private final long finishedAt;
- private final BufferedRandomAccessFile file;
+ private final DataInput input;
private final long dataStart;
public final long dataSize;
public final boolean fromRemote;
private final ColumnFamily columnFamily;
public final int columnCount;
- private final long columnPosition;
+ private long columnPosition;
+
+ private BytesReadTracker inputWithTracker; // tracks bytes read
// Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
private final int expireBefore;
@@ -90,17 +92,18 @@ public class SSTableIdentityIterator imp
this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, false);
}
- public SSTableIdentityIterator(CFMetaData metadata, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean fromRemote)
+ public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey key, long dataStart, long dataSize, boolean fromRemote)
throws IOException
{
this(metadata, file, key, dataStart, dataSize, false, null, fromRemote);
}
// sstable may be null *if* deserializeRowHeader is false
- private SSTableIdentityIterator(CFMetaData metadata, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
+ private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
throws IOException
{
- this.file = file;
+ this.input = input;
+ this.inputWithTracker = new BytesReadTracker(input);
this.key = key;
this.dataStart = dataStart;
this.dataSize = dataSize;
@@ -111,38 +114,47 @@ public class SSTableIdentityIterator imp
try
{
- file.seek(this.dataStart);
- if (checkData)
+ if (input instanceof BufferedRandomAccessFile)
{
- try
- {
- IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.usesOldBloomFilter);
- }
- catch (Exception e)
- {
- if (e instanceof EOFException)
- throw (EOFException) e;
-
- logger.debug("Invalid bloom filter in {}; will rebuild it", sstable);
- // deFreeze should have left the file position ready to deserialize index
- }
- try
- {
- IndexHelper.deserializeIndex(file);
- }
- catch (Exception e)
+ BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+ file.seek(this.dataStart);
+ if (checkData)
{
- logger.debug("Invalid row summary in {}; will rebuild it", sstable);
+ try
+ {
+ IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.usesOldBloomFilter);
+ }
+ catch (Exception e)
+ {
+ if (e instanceof EOFException)
+ throw (EOFException) e;
+
+ logger.debug("Invalid bloom filter in {}; will rebuild it", sstable);
+ // deFreeze should have left the file position ready to deserialize index
+ }
+ try
+ {
+ IndexHelper.deserializeIndex(file);
+ }
+ catch (Exception e)
+ {
+ logger.debug("Invalid row summary in {}; will rebuild it", sstable);
+ }
+ file.seek(this.dataStart);
}
- file.seek(this.dataStart);
}
- IndexHelper.skipBloomFilter(file);
- IndexHelper.skipIndex(file);
+ IndexHelper.skipBloomFilter(inputWithTracker);
+ IndexHelper.skipIndex(inputWithTracker);
columnFamily = ColumnFamily.create(metadata);
- ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, file);
- columnCount = file.readInt();
- columnPosition = file.getFilePointer();
+ ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, inputWithTracker);
+ columnCount = inputWithTracker.readInt();
+
+ if (input instanceof BufferedRandomAccessFile)
+ {
+ BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+ columnPosition = file.getFilePointer();
+ }
}
catch (IOException e)
{
@@ -162,14 +174,22 @@ public class SSTableIdentityIterator imp
public boolean hasNext()
{
- return file.getFilePointer() < finishedAt;
+ if (input instanceof BufferedRandomAccessFile)
+ {
+ BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+ return file.getFilePointer() < finishedAt;
+ }
+ else
+ {
+ return inputWithTracker.getBytesRead() < dataSize;
+ }
}
public IColumn next()
{
try
{
- IColumn column = columnFamily.getColumnSerializer().deserialize(file, null, fromRemote, expireBefore);
+ IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, null, fromRemote, expireBefore);
if (validateColumns)
column.validateFields(columnFamily.metadata());
return column;
@@ -196,23 +216,50 @@ public class SSTableIdentityIterator imp
public String getPath()
{
- return file.getPath();
+ // if input is from file, then return that path, otherwise it's from streaming
+ if (input instanceof BufferedRandomAccessFile)
+ {
+ BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+ return file.getPath();
+ }
+ else
+ {
+ throw new UnsupportedOperationException();
+ }
}
public void echoData(DataOutput out) throws IOException
{
- file.seek(dataStart);
- while (file.getFilePointer() < finishedAt)
+ // only effective when input is from file
+ if (input instanceof BufferedRandomAccessFile)
{
- out.write(file.readByte());
+ BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+ file.seek(dataStart);
+ while (file.getFilePointer() < finishedAt)
+ {
+ out.write(file.readByte());
+ }
+ }
+ else
+ {
+ throw new UnsupportedOperationException();
}
}
public ColumnFamily getColumnFamilyWithColumns() throws IOException
{
- file.seek(columnPosition - 4); // seek to before column count int
ColumnFamily cf = columnFamily.cloneMeShallow();
- ColumnFamily.serializer().deserializeColumns(file, cf, false, fromRemote);
+ if (input instanceof BufferedRandomAccessFile)
+ {
+ BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+ file.seek(columnPosition - 4); // seek to before column count int
+ ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, false, fromRemote);
+ }
+ else
+ {
+ // since we already read column count, just pass that value and continue deserialization
+ ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, false, fromRemote);
+ }
if (validateColumns)
{
try
@@ -234,13 +281,23 @@ public class SSTableIdentityIterator imp
public void reset()
{
- try
+ // only effective when input is from file
+ if (input instanceof BufferedRandomAccessFile)
{
- file.seek(columnPosition);
+ BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
+ try
+ {
+ file.seek(columnPosition);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ inputWithTracker.reset();
}
- catch (IOException e)
+ else
{
- throw new IOError(e);
+ throw new UnsupportedOperationException();
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Wed Jul 13 16:24:31 2011
@@ -382,6 +382,19 @@ public class SSTableReader extends SSTab
{
return indexSummary.getIndexPositions().size() * DatabaseDescriptor.getIndexInterval();
}
+
+ /**
+ * @param ranges
+ * @return An estimate of the number of keys for given ranges in this SSTable.
+ */
+ public long estimatedKeysForRanges(Collection<Range> ranges)
+ {
+ long sampleKeyCount = 0;
+ List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary.getIndexPositions(), ranges);
+ for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
+ sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
+ return sampleKeyCount * DatabaseDescriptor.getIndexInterval();
+ }
/**
* @return Approximately 1/INDEX_INTERVALth of the keys in this SSTable.
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Wed Jul 13 16:24:31 2011
@@ -277,6 +277,8 @@ public class SSTableWriter extends SSTab
/**
* Removes the given SSTable from temporary status and opens it, rebuilding the
* bloom filter and row index from the data file.
+ *
+ * TODO remove this post-1.0, we have one-pass streaming now (see IncomingStreamReader)
*/
public static class Builder implements CompactionInfo.Holder
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed Jul 13 16:24:31 2011
@@ -18,18 +18,32 @@
package org.apache.cassandra.streaming;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
+import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
+import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.Pair;
public class IncomingStreamReader
@@ -70,51 +84,163 @@ public class IncomingStreamReader
logger.debug("Receiving stream");
logger.debug("Creating file for {}", localFile.getFilename());
}
- FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
- FileChannel fc = fos.getChannel();
- long offset = 0;
- try
+ SSTableReader reader = null;
+ if (remoteFile.estimatedKeys > 0)
{
- for (Pair<Long, Long> section : localFile.sections)
+ logger.debug("Estimated keys {}", remoteFile.estimatedKeys);
+ DataInputStream dis = new DataInputStream(socketChannel.socket().getInputStream());
+ try
{
- long length = section.right - section.left;
- long bytesRead = 0;
- while (bytesRead < length)
- {
- bytesRead = readnwrite(length, bytesRead, offset, fc);
- }
- offset += length;
+ reader = streamIn(dis, localFile, remoteFile);
+ }
+ catch (IOException ex)
+ {
+ retry();
+ throw ex;
+ }
+ finally
+ {
+ dis.close();
}
}
- catch (IOException ex)
+ else
{
- /* Ask the source node to re-stream this file. */
- session.retry(remoteFile);
+ // backwards compatibility path
+ FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
+ FileChannel fc = fos.getChannel();
- /* Delete the orphaned file. */
- FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
- throw ex;
- }
- finally
- {
- fc.close();
+ long offset = 0;
+ try
+ {
+ for (Pair<Long, Long> section : localFile.sections)
+ {
+ long length = section.right - section.left;
+ long bytesRead = 0;
+ while (bytesRead < length)
+ {
+ bytesRead = readnwrite(length, bytesRead, offset, fc);
+ }
+ offset += length;
+ }
+ }
+ catch (IOException ex)
+ {
+ retry();
+ throw ex;
+ }
+ finally
+ {
+ fc.close();
+ }
}
- session.finished(remoteFile, localFile);
+ session.finished(remoteFile, localFile, reader);
}
protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
{
long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead);
- // if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read
- // and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and
- // raise an exception (that will be catch belove and 'the right thing' will be done).
- if (lastRead == 0)
- throw new IOException("Transfer failed for remote file " + remoteFile);
+ // if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read
+ // and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and
+ // raise an exception (that will be catch belove and 'the right thing' will be done).
+ if (lastRead == 0)
+ throw new IOException("Transfer failed for remote file " + remoteFile);
bytesRead += lastRead;
remoteFile.progress += lastRead;
return bytesRead;
}
+
+ private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException
+ {
+ ColumnFamilyStore cfs = Table.open(localFile.desc.ksname).getColumnFamilyStore(localFile.desc.cfname);
+ DecoratedKey key;
+ SSTableWriter writer = new SSTableWriter(localFile.getFilename(), remoteFile.estimatedKeys);
+ CompactionController controller = null;
+
+ BytesReadTracker in = new BytesReadTracker(input);
+
+ for (Pair<Long, Long> section : localFile.sections)
+ {
+ long length = section.right - section.left;
+ long bytesRead = 0;
+ while (bytesRead < length)
+ {
+ in.reset();
+ key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in));
+ long dataSize = SSTableReader.readRowSize(in, localFile.desc);
+ ColumnFamily cf = null;
+ if (cfs.metadata.getDefaultValidator().isCommutative())
+ {
+ // take care of counter column family
+ if (controller == null)
+ controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
+ SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
+ AbstractCompactedRow row = controller.getCompactedRow(iter);
+ writer.append(row);
+
+ if (row instanceof PrecompactedRow)
+ {
+ // we do not purge so we should not get a null here
+ cf = ((PrecompactedRow)row).getFullColumnFamily();
+ }
+ }
+ else
+ {
+ // skip BloomFilter
+ IndexHelper.skipBloomFilter(in);
+ // skip Index
+ IndexHelper.skipIndex(in);
+
+ // restore ColumnFamily
+ cf = ColumnFamily.create(cfs.metadata);
+ ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in);
+ ColumnFamily.serializer().deserializeColumns(in, cf, true, true);
+
+ // write key and cf
+ writer.append(key, cf);
+ }
+
+ // update cache
+ ColumnFamily cached = cfs.getRawCachedRow(key);
+ if (cached != null)
+ {
+ switch (remoteFile.type)
+ {
+ case AES:
+ if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
+ {
+ // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
+ // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
+ logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
+ cfs.invalidateCachedRow(key);
+ }
+ else
+ {
+ assert cf != null;
+ cfs.updateRowCache(key, cf);
+ }
+ break;
+ default:
+ cfs.invalidateCachedRow(key);
+ break;
+ }
+ }
+
+ bytesRead += in.getBytesRead();
+ remoteFile.progress += in.getBytesRead();
+ }
+ }
+ return writer.closeAndOpenReader();
+ }
+
+ private void retry() throws IOException
+ {
+ /* Ask the source node to re-stream this file. */
+ session.retry(remoteFile);
+
+ /* Delete the orphaned file. */
+ FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Wed Jul 13 16:24:31 2011
@@ -53,15 +53,21 @@ public class PendingFile
public final List<Pair<Long,Long>> sections;
public final OperationType type;
public final long size;
+ public final long estimatedKeys;
public long progress;
public PendingFile(Descriptor desc, PendingFile pf)
{
- this(null, desc, pf.component, pf.sections, pf.type);
+ this(null, desc, pf.component, pf.sections, pf.type, pf.estimatedKeys);
}
public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type)
{
+ this(sstable, desc, component, sections, type, 0);
+ }
+
+ public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type, long estimatedKeys)
+ {
this.sstable = sstable;
this.desc = desc;
this.component = component;
@@ -74,6 +80,8 @@ public class PendingFile
tempSize += section.right - section.left;
}
size = tempSize;
+
+ this.estimatedKeys = estimatedKeys;
}
public String getFilename()
@@ -119,6 +127,8 @@ public class PendingFile
}
if (version > MessagingService.VERSION_07)
dos.writeUTF(sc.type.name());
+ if (version > MessagingService.VERSION_080)
+ dos.writeLong(sc.estimatedKeys);
}
public PendingFile deserialize(DataInputStream dis, int version) throws IOException
@@ -137,7 +147,10 @@ public class PendingFile
OperationType type = OperationType.RESTORE_REPLICA_COUNT;
if (version > MessagingService.VERSION_07)
type = OperationType.valueOf(dis.readUTF());
- return new PendingFile(null, desc, component, sections, type);
+ long estimatedKeys = 0;
+ if (version > MessagingService.VERSION_080)
+ estimatedKeys = dis.readLong();
+ return new PendingFile(null, desc, component, sections, type, estimatedKeys);
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Wed Jul 13 16:24:31 2011
@@ -51,6 +51,7 @@ public class StreamInSession
private final Runnable callback;
private String table;
private final Collection<Future<SSTableReader>> buildFutures = new LinkedBlockingQueue<Future<SSTableReader>>();
+ private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
private PendingFile current;
private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
@@ -102,13 +103,21 @@ public class StreamInSession
}
}
- public void finished(PendingFile remoteFile, PendingFile localFile) throws IOException
+ public void finished(PendingFile remoteFile, PendingFile localFile, SSTableReader reader) throws IOException
{
if (logger.isDebugEnabled())
logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
- Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
- buildFutures.add(future);
+ if (reader != null)
+ {
+ // SSTR was already built during streaming
+ readers.add(reader);
+ }
+ else
+ {
+ Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
+ buildFutures.add(future);
+ }
files.remove(remoteFile);
if (remoteFile.equals(current))
@@ -152,6 +161,16 @@ public class StreamInSession
throw new RuntimeException(e);
}
}
+
+ for (SSTableReader sstable : readers)
+ {
+ assert sstable.getTableName().equals(table);
+ ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+ cfs.addSSTable(sstable);
+ if (!cfstores.containsKey(cfs))
+ cfstores.put(cfs, new ArrayList<SSTableReader>());
+ cfstores.get(cfs).add(sstable);
+ }
// build secondary indexes
for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1146117&r1=1146116&r2=1146117&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Wed Jul 13 16:24:31 2011
@@ -151,7 +151,7 @@ public class StreamOut
List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
if (sections.isEmpty())
continue;
- pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type));
+ pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type, sstable.estimatedKeysForRanges(ranges)));
}
logger.info("Stream context metadata {}, {} sstables.", pending, Iterables.size(sstables));
return pending;