You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/04 15:02:06 UTC
svn commit: r1142647 - in
/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra: db/
hadoop/ io/sstable/ streaming/
Author: jbellis
Date: Mon Jul 4 13:02:05 2011
New Revision: 1142647
URL: http://svn.apache.org/viewvc?rev=1142647&view=rev
Log:
revert incomplete changes
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Mon Jul 4 13:02:05 2011
@@ -130,12 +130,6 @@ public class ColumnFamilySerializer impl
public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean intern, boolean fromRemote) throws IOException
{
int size = dis.readInt();
- deserializeColumns(dis, cf, size, intern, fromRemote);
- }
-
- /* column count is already read from DataInput */
- public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, boolean intern, boolean fromRemote) throws IOException
- {
ColumnFamilyStore interner = intern ? Table.open(CFMetaData.getCF(cf.id()).left).getColumnFamilyStore(cf.id()) : null;
for (int i = 0; i < size; ++i)
{
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Mon Jul 4 13:02:05 2011
@@ -35,9 +35,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.TokenRange;
+import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
import org.apache.thrift.TException;
@@ -100,43 +101,11 @@ public class ColumnFamilyInputFormat ext
try
{
- KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
- IPartitioner partitioner = null;
- Range jobRange = null;
- if (jobKeyRange != null)
- {
- partitioner = ConfigHelper.getPartitioner(context.getConfiguration());
- assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner";
- jobRange = new Range(partitioner.getToken(jobKeyRange.start_key),
- partitioner.getToken(jobKeyRange.end_key),
- partitioner);
- }
-
List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
for (TokenRange range : masterRangeNodes)
{
- if (jobRange == null)
- {
// for each range, pick a live owner and ask it to compute bite-sized splits
splitfutures.add(executor.submit(new SplitCallable(range, conf)));
- }
- else
- {
- Range dhtRange = new Range(partitioner.getTokenFactory().fromString(range.start_token),
- partitioner.getTokenFactory().fromString(range.end_token),
- partitioner);
-
- if (dhtRange.intersects(jobRange))
- {
- Set<Range> intersections = dhtRange.intersectionWith(jobRange);
- assert intersections.size() == 1 : "wrapping ranges not supported";
- Range intersection = intersections.iterator().next();
- range.start_token = partitioner.getTokenFactory().toString(intersection.left);
- range.end_token = partitioner.getTokenFactory().toString(intersection.right);
- // for each range, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(range, conf)));
- }
- }
}
// wait until we have all the results back
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Mon Jul 4 13:02:05 2011
@@ -22,7 +22,6 @@ package org.apache.cassandra.hadoop;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.cassandra.utils.FBUtilities;
@@ -43,7 +42,6 @@ public class ConfigHelper
private static final String INPUT_COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily";
private static final String OUTPUT_COLUMNFAMILY_CONFIG = "cassandra.output.columnfamily";
private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate";
- private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange";
private static final String OUTPUT_PREDICATE_CONFIG = "cassandra.output.predicate";
private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
@@ -197,51 +195,6 @@ public class ConfigHelper
return predicate;
}
- /**
- * Set the KeyRange to limit the rows.
- * @param conf Job configuration you are about to run
- * @param keyRange
- */
- public static void setInputKeyRange(Configuration conf, KeyRange keyRange){
- conf.set(INPUT_KEYRANGE_CONFIG, keyRangeToString(keyRange));
- }
-
- /** may be null if unset */
- public static KeyRange getInputKeyRange(Configuration conf){
- String str = conf.get(INPUT_KEYRANGE_CONFIG);
- return null != str ? keyRangeFromString(str) : null;
- }
-
- private static String keyRangeToString(KeyRange keyRange)
- {
- assert keyRange != null;
- TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
- try
- {
- return FBUtilities.bytesToHex(serializer.serialize(keyRange));
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- private static KeyRange keyRangeFromString(String st)
- {
- assert st != null;
- TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
- KeyRange keyRange = new KeyRange();
- try
- {
- deserializer.deserialize(keyRange, FBUtilities.hexToBytes(st));
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
- return keyRange;
- }
-
public static String getInputKeyspace(Configuration conf)
{
return conf.get(INPUT_KEYSPACE_CONFIG);
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Mon Jul 4 13:02:05 2011
@@ -42,45 +42,27 @@ public class IndexHelper
* @param in the data input from which the bloom filter should be skipped
* @throws IOException
*/
- public static void skipBloomFilter(DataInput in) throws IOException
+ public static void skipBloomFilter(FileDataInput in) throws IOException
{
/* size of the bloom filter */
int size = in.readInt();
/* skip the serialized bloom filter */
- if (in instanceof FileDataInput)
- {
- FileUtils.skipBytesFully(in, size);
- }
- else
- {
- // skip bytes
- byte[] skip = new byte[size];
- in.readFully(skip);
- }
+ FileUtils.skipBytesFully(in, size);
}
- /**
- * Skip the index
- * @param in the data input from which the index should be skipped
- * @throws IOException if an I/O error occurs.
- */
- public static void skipIndex(DataInput in) throws IOException
- {
+ /**
+ * Skip the index
+ * @param file the data input from which the index should be skipped
+ * @throws IOException if an I/O error occurs.
+ */
+ public static void skipIndex(FileDataInput file) throws IOException
+ {
/* read only the column index list */
- int columnIndexSize = in.readInt();
+ int columnIndexSize = file.readInt();
/* skip the column index data */
- if (in instanceof FileDataInput)
- {
- FileUtils.skipBytesFully(in, columnIndexSize);
- }
- else
- {
- // skip bytes
- byte[] skip = new byte[columnIndexSize];
- in.readFully(skip);
- }
- }
-
+ FileUtils.skipBytesFully(file, columnIndexSize);
+ }
+
/**
* Deserialize the index into a structure and return it
*
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Mon Jul 4 13:02:05 2011
@@ -21,11 +21,11 @@ package org.apache.cassandra.io.sstable;
*/
-import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOError;
import java.io.IOException;
+import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +37,7 @@ import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
-import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.utils.Filter;
public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
{
@@ -45,16 +45,14 @@ public class SSTableIdentityIterator imp
private final DecoratedKey key;
private final long finishedAt;
- private final DataInput input;
+ private final BufferedRandomAccessFile file;
private final long dataStart;
public final long dataSize;
public final boolean fromRemote;
private final ColumnFamily columnFamily;
public final int columnCount;
- private long columnPosition;
-
- private BytesReadTracker inputWithTracker; // tracks bytes read
+ private final long columnPosition;
// Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
private final int expireBefore;
@@ -92,18 +90,17 @@ public class SSTableIdentityIterator imp
this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, false);
}
- public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey key, long dataStart, long dataSize, boolean fromRemote)
+ public SSTableIdentityIterator(CFMetaData metadata, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean fromRemote)
throws IOException
{
this(metadata, file, key, dataStart, dataSize, false, null, fromRemote);
}
// sstable may be null *if* deserializeRowHeader is false
- private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
+ private SSTableIdentityIterator(CFMetaData metadata, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
throws IOException
{
- this.input = input;
- this.inputWithTracker = new BytesReadTracker(input);
+ this.file = file;
this.key = key;
this.dataStart = dataStart;
this.dataSize = dataSize;
@@ -114,47 +111,38 @@ public class SSTableIdentityIterator imp
try
{
- if (input instanceof BufferedRandomAccessFile)
+ file.seek(this.dataStart);
+ if (checkData)
{
- BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
- file.seek(this.dataStart);
- if (checkData)
+ try
+ {
+ IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.usesOldBloomFilter);
+ }
+ catch (Exception e)
+ {
+ if (e instanceof EOFException)
+ throw (EOFException) e;
+
+ logger.debug("Invalid bloom filter in {}; will rebuild it", sstable);
+ // deFreeze should have left the file position ready to deserialize index
+ }
+ try
{
- try
- {
- IndexHelper.defreezeBloomFilter(file, dataSize, sstable.descriptor.usesOldBloomFilter);
- }
- catch (Exception e)
- {
- if (e instanceof EOFException)
- throw (EOFException) e;
-
- logger.debug("Invalid bloom filter in {}; will rebuild it", sstable);
- // deFreeze should have left the file position ready to deserialize index
- }
- try
- {
- IndexHelper.deserializeIndex(file);
- }
- catch (Exception e)
- {
- logger.debug("Invalid row summary in {}; will rebuild it", sstable);
- }
- file.seek(this.dataStart);
+ IndexHelper.deserializeIndex(file);
}
+ catch (Exception e)
+ {
+ logger.debug("Invalid row summary in {}; will rebuild it", sstable);
+ }
+ file.seek(this.dataStart);
}
- IndexHelper.skipBloomFilter(inputWithTracker);
- IndexHelper.skipIndex(inputWithTracker);
+ IndexHelper.skipBloomFilter(file);
+ IndexHelper.skipIndex(file);
columnFamily = ColumnFamily.create(metadata);
- ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, inputWithTracker);
- columnCount = inputWithTracker.readInt();
-
- if (input instanceof BufferedRandomAccessFile)
- {
- BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
- columnPosition = file.getFilePointer();
- }
+ ColumnFamily.serializer().deserializeFromSSTableNoColumns(columnFamily, file);
+ columnCount = file.readInt();
+ columnPosition = file.getFilePointer();
}
catch (IOException e)
{
@@ -174,22 +162,14 @@ public class SSTableIdentityIterator imp
public boolean hasNext()
{
- if (input instanceof BufferedRandomAccessFile)
- {
- BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
- return file.getFilePointer() < finishedAt;
- }
- else
- {
- return inputWithTracker.getBytesRead() < dataSize;
- }
+ return file.getFilePointer() < finishedAt;
}
public IColumn next()
{
try
{
- IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, null, fromRemote, expireBefore);
+ IColumn column = columnFamily.getColumnSerializer().deserialize(file, null, fromRemote, expireBefore);
if (validateColumns)
column.validateFields(columnFamily.metadata());
return column;
@@ -216,50 +196,23 @@ public class SSTableIdentityIterator imp
public String getPath()
{
- // if input is from file, then return that path, otherwise it's from streaming
- if (input instanceof BufferedRandomAccessFile)
- {
- BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
- return file.getPath();
- }
- else
- {
- throw new UnsupportedOperationException();
- }
+ return file.getPath();
}
public void echoData(DataOutput out) throws IOException
{
- // only effective when input is from file
- if (input instanceof BufferedRandomAccessFile)
+ file.seek(dataStart);
+ while (file.getFilePointer() < finishedAt)
{
- BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
- file.seek(dataStart);
- while (file.getFilePointer() < finishedAt)
- {
- out.write(file.readByte());
- }
- }
- else
- {
- throw new UnsupportedOperationException();
+ out.write(file.readByte());
}
}
public ColumnFamily getColumnFamilyWithColumns() throws IOException
{
+ file.seek(columnPosition - 4); // seek to before column count int
ColumnFamily cf = columnFamily.cloneMeShallow();
- if (input instanceof BufferedRandomAccessFile)
- {
- BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
- file.seek(columnPosition - 4); // seek to before column count int
- ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, false, fromRemote);
- }
- else
- {
- // since we already read column count, just pass that value and continue deserialization
- ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, false, fromRemote);
- }
+ ColumnFamily.serializer().deserializeColumns(file, cf, false, fromRemote);
if (validateColumns)
{
try
@@ -281,23 +234,13 @@ public class SSTableIdentityIterator imp
public void reset()
{
- // only effective when input is from file
- if (input instanceof BufferedRandomAccessFile)
+ try
{
- BufferedRandomAccessFile file = (BufferedRandomAccessFile) input;
- try
- {
- file.seek(columnPosition);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- inputWithTracker.reset();
+ file.seek(columnPosition);
}
- else
+ catch (IOException e)
{
- throw new UnsupportedOperationException();
+ throw new IOError(e);
}
}
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Mon Jul 4 13:02:05 2011
@@ -18,8 +18,6 @@
package org.apache.cassandra.streaming;
-import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -27,27 +25,11 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
-import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.PrecompactedRow;
-import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.Pair;
public class IncomingStreamReader
@@ -88,162 +70,51 @@ public class IncomingStreamReader
logger.debug("Receiving stream");
logger.debug("Creating file for {}", localFile.getFilename());
}
+ FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
+ FileChannel fc = fos.getChannel();
- SSTableReader reader = null;
- if (remoteFile.estimatedKeys > 0)
+ long offset = 0;
+ try
{
- logger.debug("Estimated keys {}", remoteFile.estimatedKeys);
- DataInputStream dis = new DataInputStream(socketChannel.socket().getInputStream());
- try
+ for (Pair<Long, Long> section : localFile.sections)
{
- reader = streamIn(dis, localFile, remoteFile);
- }
- catch (IOException ex)
- {
- retry();
- throw ex;
- }
- finally
- {
- dis.close();
+ long length = section.right - section.left;
+ long bytesRead = 0;
+ while (bytesRead < length)
+ {
+ bytesRead = readnwrite(length, bytesRead, offset, fc);
+ }
+ offset += length;
}
}
- else
+ catch (IOException ex)
{
- FileOutputStream fos = new FileOutputStream(localFile.getFilename(), true);
- FileChannel fc = fos.getChannel();
+ /* Ask the source node to re-stream this file. */
+ session.retry(remoteFile);
- long offset = 0;
- try
- {
- for (Pair<Long, Long> section : localFile.sections)
- {
- long length = section.right - section.left;
- long bytesRead = 0;
- while (bytesRead < length)
- {
- bytesRead = readnwrite(length, bytesRead, offset, fc);
- }
- offset += length;
- }
- }
- catch (IOException ex)
- {
- retry();
- throw ex;
- }
- finally
- {
- fc.close();
- }
+ /* Delete the orphaned file. */
+ FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
+ throw ex;
+ }
+ finally
+ {
+ fc.close();
}
- session.finished(remoteFile, localFile, reader);
+ session.finished(remoteFile, localFile);
}
protected long readnwrite(long length, long bytesRead, long offset, FileChannel fc) throws IOException
{
long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length - bytesRead);
long lastRead = fc.transferFrom(socketChannel, offset + bytesRead, toRead);
- // if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read
- // and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and
- // raise an exception (that will be catch belove and 'the right thing' will be done).
- if (lastRead == 0)
- throw new IOException("Transfer failed for remote file " + remoteFile);
+ // if the other side fails, we will not get an exception, but instead transferFrom will constantly return 0 byte read
+ // and we would thus enter an infinite loop. So intead, if no bytes are tranferred we assume the other side is dead and
+ // raise an exception (that will be catch belove and 'the right thing' will be done).
+ if (lastRead == 0)
+ throw new IOException("Transfer failed for remote file " + remoteFile);
bytesRead += lastRead;
remoteFile.progress += lastRead;
return bytesRead;
}
-
- private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException
- {
- ColumnFamilyStore cfs = Table.open(localFile.desc.ksname).getColumnFamilyStore(localFile.desc.cfname);
- DecoratedKey key;
- SSTableWriter writer = new SSTableWriter(localFile.getFilename(), remoteFile.estimatedKeys);
- CompactionController controller = null;
-
- BytesReadTracker in = new BytesReadTracker(input);
-
- for (Pair<Long, Long> section : localFile.sections)
- {
- long length = section.right - section.left;
- long bytesRead = 0;
- while (bytesRead < length)
- {
- key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in));
- long dataSize = SSTableReader.readRowSize(in, localFile.desc);
- ColumnFamily cf = null;
- if (cfs.metadata.getDefaultValidator().isCommutative())
- {
- // take care of counter column family
- if (controller == null)
- controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MAX_VALUE, true);
- SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
- AbstractCompactedRow row = controller.getCompactedRow(iter);
- writer.append(row);
-
- if (row instanceof PrecompactedRow)
- {
- // we do not purge so we should not get a null here
- cf = ((PrecompactedRow)row).getFullColumnFamily();
- }
- }
- else
- {
- // skip BloomFilter
- IndexHelper.skipBloomFilter(in);
- // skip Index
- IndexHelper.skipIndex(in);
-
- // restore ColumnFamily
- cf = ColumnFamily.create(cfs.metadata);
- ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, in);
- ColumnFamily.serializer().deserializeColumns(in, cf, true, true);
-
- // write key and cf
- writer.append(key, cf);
- }
-
- // update cache
- ColumnFamily cached = cfs.getRawCachedRow(key);
- if (cached != null)
- {
- switch (remoteFile.type)
- {
- case AES:
- if (dataSize > DatabaseDescriptor.getInMemoryCompactionLimit())
- {
- // We have a key in cache for a very big row, that is fishy. We don't fail here however because that would prevent the sstable
- // from being build (and there is no real point anyway), so we just invalidate the row for correction and log a warning.
- logger.warn("Found a cached row over the in memory compaction limit during post-streaming rebuilt; it is highly recommended to avoid huge row on column family with row cache enabled.");
- cfs.invalidateCachedRow(key);
- }
- else
- {
- assert cf != null;
- cfs.updateRowCache(key, cf);
- }
- break;
- default:
- cfs.invalidateCachedRow(key);
- break;
- }
- }
-
- bytesRead += in.getBytesRead();
- remoteFile.progress += in.getBytesRead();
- }
- }
-
- return writer.closeAndOpenReader();
- }
-
- private void retry() throws IOException
- {
- /* Ask the source node to re-stream this file. */
- session.retry(remoteFile);
-
- /* Delete the orphaned file. */
- FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
- }
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/PendingFile.java Mon Jul 4 13:02:05 2011
@@ -53,21 +53,15 @@ public class PendingFile
public final List<Pair<Long,Long>> sections;
public final OperationType type;
public final long size;
- public final long estimatedKeys;
public long progress;
public PendingFile(Descriptor desc, PendingFile pf)
{
- this(null, desc, pf.component, pf.sections, pf.type, pf.estimatedKeys);
+ this(null, desc, pf.component, pf.sections, pf.type);
}
public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type)
{
- this(sstable, desc, component, sections, type, 0);
- }
-
- public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type, long estimatedKeys)
- {
this.sstable = sstable;
this.desc = desc;
this.component = component;
@@ -80,8 +74,6 @@ public class PendingFile
tempSize += section.right - section.left;
}
size = tempSize;
-
- this.estimatedKeys = estimatedKeys;
}
public String getFilename()
@@ -127,7 +119,6 @@ public class PendingFile
}
if (version > MessagingService.VERSION_07)
dos.writeUTF(sc.type.name());
- dos.writeLong(sc.estimatedKeys);
}
public PendingFile deserialize(DataInputStream dis, int version) throws IOException
@@ -146,8 +137,7 @@ public class PendingFile
OperationType type = OperationType.RESTORE_REPLICA_COUNT;
if (version > MessagingService.VERSION_07)
type = OperationType.valueOf(dis.readUTF());
- long estimatedKeys = dis.readLong();
- return new PendingFile(null, desc, component, sections, type, estimatedKeys);
+ return new PendingFile(null, desc, component, sections, type);
}
}
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamInSession.java Mon Jul 4 13:02:05 2011
@@ -51,7 +51,6 @@ public class StreamInSession
private final Runnable callback;
private String table;
private final Collection<Future<SSTableReader>> buildFutures = new LinkedBlockingQueue<Future<SSTableReader>>();
- private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
private PendingFile current;
private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
@@ -103,21 +102,13 @@ public class StreamInSession
}
}
- public void finished(PendingFile remoteFile, PendingFile localFile, SSTableReader reader) throws IOException
+ public void finished(PendingFile remoteFile, PendingFile localFile) throws IOException
{
if (logger.isDebugEnabled())
logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
- if (reader != null)
- {
- // SSTR was already built during streaming
- readers.add(reader);
- }
- else
- {
- Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
- buildFutures.add(future);
- }
+ Future<SSTableReader> future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
+ buildFutures.add(future);
files.remove(remoteFile);
if (remoteFile.equals(current))
@@ -145,7 +136,14 @@ public class StreamInSession
try
{
SSTableReader sstable = future.get();
- readers.add(sstable);
+ assert sstable.getTableName().equals(table);
+ if (sstable == null)
+ continue;
+ ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
+ cfs.addSSTable(sstable);
+ if (!cfstores.containsKey(cfs))
+ cfstores.put(cfs, new ArrayList<SSTableReader>());
+ cfstores.get(cfs).add(sstable);
}
catch (InterruptedException e)
{
@@ -156,18 +154,6 @@ public class StreamInSession
throw new RuntimeException(e);
}
}
-
- for (SSTableReader sstable : readers)
- {
- assert sstable.getTableName().equals(table);
- if (sstable == null)
- continue;
- ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
- cfs.addSSTable(sstable);
- if (!cfstores.containsKey(cfs))
- cfstores.put(cfs, new ArrayList<SSTableReader>());
- cfstores.get(cfs).add(sstable);
- }
// build secondary indexes
for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1142647&r1=1142646&r2=1142647&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOut.java Mon Jul 4 13:02:05 2011
@@ -176,7 +176,7 @@ public class StreamOut
List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
if (sections.isEmpty())
continue;
- pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type, sstable.estimatedKeys()));
+ pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type));
}
logger.info("Stream context metadata {}, {} sstables.", pending, sstables.size());
return pending;