You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/11/10 20:18:47 UTC
svn commit: r1200483 - in /cassandra/branches/cassandra-1.0: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/
src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/io/u...
Author: slebresne
Date: Thu Nov 10 19:18:46 2011
New Revision: 1200483
URL: http://svn.apache.org/viewvc?rev=1200483&view=rev
Log:
Fix incorrect size exception during streaming of counters
patch by slebresne; reviewed by jbellis for CASSANDRA-3481
Modified:
cassandra/branches/cassandra-1.0/CHANGES.txt
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnSerializer.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterColumn.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ExpiringColumn.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SuperColumn.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/IColumnSerializer.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/db/CounterColumnTest.java
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Thu Nov 10 19:18:46 2011
@@ -8,6 +8,7 @@
* fix reading metadata/statistics component for version < h (CASSANDRA-3474)
* add sstable forward-compatibility (CASSANDRA-3478)
* report compression ratio in CFSMBean (CASSANDRA-3393)
+ * fix incorrect size exception during streaming of counters (CASSANDRA-3481)
Merged from 0.8:
* Make counter shard merging thread safe (CASSANDRA-3178)
* fix updating CF row_cache_provider (CASSANDRA-3414)
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Thu Nov 10 19:18:46 2011
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.SSTableMetadata;
@@ -114,10 +115,10 @@ public class ColumnFamilySerializer impl
public ColumnFamily deserialize(DataInput dis) throws IOException
{
- return deserialize(dis, false, ThreadSafeSortedColumns.factory());
+ return deserialize(dis, IColumnSerializer.Flag.LOCAL, ThreadSafeSortedColumns.factory());
}
- public ColumnFamily deserialize(DataInput dis, boolean fromRemote, ISortedColumns.Factory factory) throws IOException
+ public ColumnFamily deserialize(DataInput dis, IColumnSerializer.Flag flag, ISortedColumns.Factory factory) throws IOException
{
if (!dis.readBoolean())
return null;
@@ -128,22 +129,22 @@ public class ColumnFamilySerializer impl
throw new UnserializableColumnFamilyException("Couldn't find cfId=" + cfId, cfId);
ColumnFamily cf = ColumnFamily.create(cfId, factory);
deserializeFromSSTableNoColumns(cf, dis);
- deserializeColumns(dis, cf, fromRemote);
+ deserializeColumns(dis, cf, flag);
return cf;
}
- public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean fromRemote) throws IOException
+ public void deserializeColumns(DataInput dis, ColumnFamily cf, IColumnSerializer.Flag flag) throws IOException
{
int size = dis.readInt();
- deserializeColumns(dis, cf, size, fromRemote);
+ deserializeColumns(dis, cf, size, flag);
}
/* column count is already read from DataInput */
- public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, boolean fromRemote) throws IOException
+ public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, IColumnSerializer.Flag flag) throws IOException
{
for (int i = 0; i < size; ++i)
{
- IColumn column = cf.getColumnSerializer().deserialize(dis, fromRemote, (int) (System.currentTimeMillis() / 1000));
+ IColumn column = cf.getColumnSerializer().deserialize(dis, flag, (int) (System.currentTimeMillis() / 1000));
cf.addColumn(column);
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnSerializer.java Thu Nov 10 19:18:46 2011
@@ -70,7 +70,7 @@ public class ColumnSerializer implements
public Column deserialize(DataInput dis) throws IOException
{
- return deserialize(dis, false);
+ return deserialize(dis, Flag.LOCAL);
}
/*
@@ -78,12 +78,12 @@ public class ColumnSerializer implements
* deserialize comes from a remote host. If it does, then we must clear
* the delta.
*/
- public Column deserialize(DataInput dis, boolean fromRemote) throws IOException
+ public Column deserialize(DataInput dis, IColumnSerializer.Flag flag) throws IOException
{
- return deserialize(dis, fromRemote, (int) (System.currentTimeMillis() / 1000));
+ return deserialize(dis, flag, (int) (System.currentTimeMillis() / 1000));
}
- public Column deserialize(DataInput dis, boolean fromRemote, int expireBefore) throws IOException
+ public Column deserialize(DataInput dis, IColumnSerializer.Flag flag, int expireBefore) throws IOException
{
ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
if (name.remaining() <= 0)
@@ -104,7 +104,7 @@ public class ColumnSerializer implements
long timestampOfLastDelete = dis.readLong();
long ts = dis.readLong();
ByteBuffer value = ByteBufferUtil.readWithLength(dis);
- return CounterColumn.create(name, value, ts, timestampOfLastDelete, fromRemote);
+ return CounterColumn.create(name, value, ts, timestampOfLastDelete, flag);
}
else if ((b & EXPIRATION_MASK) != 0)
{
@@ -112,7 +112,7 @@ public class ColumnSerializer implements
int expiration = dis.readInt();
long ts = dis.readLong();
ByteBuffer value = ByteBufferUtil.readWithLength(dis);
- return ExpiringColumn.create(name, value, ts, ttl, expiration, expireBefore);
+ return ExpiringColumn.create(name, value, ts, ttl, expiration, expireBefore, flag);
}
else
{
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterColumn.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterColumn.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterColumn.java Thu Nov 10 19:18:46 2011
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.context.C
import org.apache.cassandra.db.context.IContext.ContextRelationship;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.Allocator;
import org.apache.cassandra.service.IWriteResponseHandler;
@@ -76,11 +77,11 @@ public class CounterColumn extends Colum
this.timestampOfLastDelete = timestampOfLastDelete;
}
- public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, boolean fromRemote)
+ public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, IColumnSerializer.Flag flag)
{
// #elt being negative means we have to clean delta
short count = value.getShort(value.position());
- if (fromRemote || count < 0)
+ if (flag == IColumnSerializer.Flag.FROM_REMOTE || (flag == IColumnSerializer.Flag.LOCAL && count < 0))
value = CounterContext.instance().clearAllDelta(value);
return new CounterColumn(name, value, timestamp, timestampOfLastDelete);
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ExpiringColumn.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ExpiringColumn.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ExpiringColumn.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ExpiringColumn.java Thu Nov 10 19:18:46 2011
@@ -25,6 +25,7 @@ import java.security.MessageDigest;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.Allocator;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -60,9 +61,9 @@ public class ExpiringColumn extends Colu
}
/** @return Either a DeletedColumn, or an ExpiringColumn. */
- public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore)
+ public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, IColumnSerializer.Flag flag)
{
- if (localExpirationTime >= expireBefore)
+ if (localExpirationTime >= expireBefore || flag == IColumnSerializer.Flag.PRESERVE_SIZE)
return new ExpiringColumn(name, value, timestamp, timeToLive, localExpirationTime);
// the column is now expired, we can safely return a simple tombstone
return new DeletedColumn(name, localExpirationTime, timestamp);
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java Thu Nov 10 19:18:46 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -105,7 +106,7 @@ class ReadResponseSerializer implements
if (!isDigest)
{
// This is coming from a remote host
- row = Row.serializer().deserialize(dis, version, true, ArrayBackedSortedColumns.factory());
+ row = Row.serializer().deserialize(dis, version, IColumnSerializer.Flag.FROM_REMOTE, ArrayBackedSortedColumns.factory());
}
return isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row);
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java Thu Nov 10 19:18:46 2011
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
import java.io.*;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -61,15 +62,15 @@ public class Row
ColumnFamily.serializer().serialize(row.cf, dos);
}
- public Row deserialize(DataInput dis, int version, boolean fromRemote, ISortedColumns.Factory factory) throws IOException
+ public Row deserialize(DataInput dis, int version, IColumnSerializer.Flag flag, ISortedColumns.Factory factory) throws IOException
{
return new Row(StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(dis)),
- ColumnFamily.serializer().deserialize(dis, fromRemote, factory));
+ ColumnFamily.serializer().deserialize(dis, flag, factory));
}
public Row deserialize(DataInput dis, int version) throws IOException
{
- return deserialize(dis, version, false, ThreadSafeSortedColumns.factory());
+ return deserialize(dis, version, IColumnSerializer.Flag.LOCAL, ThreadSafeSortedColumns.factory());
}
public long serializedSize(Row row, int version)
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java Thu Nov 10 19:18:46 2011
@@ -31,6 +31,7 @@ import org.apache.cassandra.config.CFMet
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.Message;
@@ -388,7 +389,7 @@ public class RowMutation implements IMut
}
}
- public RowMutation deserialize(DataInput dis, int version, boolean fromRemote) throws IOException
+ public RowMutation deserialize(DataInput dis, int version, IColumnSerializer.Flag flag) throws IOException
{
String table = dis.readUTF();
ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
@@ -397,7 +398,7 @@ public class RowMutation implements IMut
for (int i = 0; i < size; ++i)
{
Integer cfid = Integer.valueOf(dis.readInt());
- ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, fromRemote, ThreadSafeSortedColumns.factory());
+ ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, flag, ThreadSafeSortedColumns.factory());
modifications.put(cfid, cf);
}
return new RowMutation(table, key, modifications);
@@ -405,7 +406,7 @@ public class RowMutation implements IMut
public RowMutation deserialize(DataInput dis, int version) throws IOException
{
- return deserialize(dis, version, true);
+ return deserialize(dis, version, IColumnSerializer.Flag.FROM_REMOTE);
}
public long serializedSize(RowMutation rm, int version)
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SuperColumn.java Thu Nov 10 19:18:46 2011
@@ -336,15 +336,15 @@ class SuperColumnSerializer implements I
public IColumn deserialize(DataInput dis) throws IOException
{
- return deserialize(dis, false);
+ return deserialize(dis, IColumnSerializer.Flag.LOCAL);
}
- public IColumn deserialize(DataInput dis, boolean fromRemote) throws IOException
+ public IColumn deserialize(DataInput dis, IColumnSerializer.Flag flag) throws IOException
{
- return deserialize(dis, fromRemote, (int)(System.currentTimeMillis() / 1000));
+ return deserialize(dis, flag, (int)(System.currentTimeMillis() / 1000));
}
- public IColumn deserialize(DataInput dis, boolean fromRemote, int expireBefore) throws IOException
+ public IColumn deserialize(DataInput dis, IColumnSerializer.Flag flag, int expireBefore) throws IOException
{
ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
int localDeleteTime = dis.readInt();
@@ -357,7 +357,7 @@ class SuperColumnSerializer implements I
/* read the number of columns */
int size = dis.readInt();
ColumnSerializer serializer = Column.serializer();
- ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, fromRemote, expireBefore);
+ ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, flag, expireBefore);
SuperColumn superColumn = new SuperColumn(name, ThreadSafeSortedColumns.factory().fromSorted(preSortedMap, false));
superColumn.delete(localDeleteTime, markedForDeleteAt);
return superColumn;
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Thu Nov 10 19:18:46 2011
@@ -42,6 +42,7 @@ import org.apache.cassandra.concurrent.S
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -281,7 +282,7 @@ public class CommitLog implements Commit
{
// assuming version here. We've gone to lengths to make sure what gets written to the CL is in
// the current version. so do make sure the CL is drained prior to upgrading a node.
- rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, false);
+ rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, IColumnSerializer.Flag.LOCAL);
}
catch (UnserializableColumnFamilyException ex)
{
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/IColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/IColumnSerializer.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/IColumnSerializer.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/IColumnSerializer.java Thu Nov 10 19:18:46 2011
@@ -27,5 +27,21 @@ import org.apache.cassandra.db.IColumn;
public interface IColumnSerializer extends ISerializer<IColumn>
{
- public IColumn deserialize(DataInput in, boolean fromRemote, int expireBefore) throws IOException;
+ /**
+ * Flag affecting deserialization behavior.
+ * - LOCAL: for deserialization of local data (Expired columns are
+ * converted to tombstones (to gain disk space)).
+ * - FROM_REMOTE: for deserialization of data received from remote hosts
+ * (Expired columns are converted to tombstone and counters have
+ * their delta cleared)
+ * - PRESERVE_SIZE: used when no transformation must be performed, i.e,
+ * when we must ensure that deserializing and reserializing the
+ * result yield the exact same bytes. Streaming uses this.
+ */
+ public static enum Flag
+ {
+ LOCAL, FROM_REMOTE, PRESERVE_SIZE;
+ }
+
+ public IColumn deserialize(DataInput in, Flag flag, int expireBefore) throws IOException;
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Thu Nov 10 19:18:46 2011
@@ -30,6 +30,7 @@ import org.apache.cassandra.config.CFMet
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.BytesReadTracker;
@@ -41,7 +42,7 @@ public class SSTableIdentityIterator imp
private final DataInput input;
private final long dataStart;
public final long dataSize;
- public final boolean fromRemote;
+ public final IColumnSerializer.Flag flag;
private final ColumnFamily columnFamily;
private final int columnCount;
@@ -82,17 +83,17 @@ public class SSTableIdentityIterator imp
public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey<?> key, long dataStart, long dataSize, boolean checkData)
throws IOException
{
- this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, false);
+ this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, IColumnSerializer.Flag.LOCAL);
}
- public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey<?> key, long dataStart, long dataSize, boolean fromRemote)
+ public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey<?> key, long dataStart, long dataSize, IColumnSerializer.Flag flag)
throws IOException
{
- this(metadata, file, key, dataStart, dataSize, false, null, fromRemote);
+ this(metadata, file, key, dataStart, dataSize, false, null, flag);
}
// sstable may be null *if* deserializeRowHeader is false
- private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey<?> key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
+ private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey<?> key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, IColumnSerializer.Flag flag)
throws IOException
{
this.input = input;
@@ -101,7 +102,7 @@ public class SSTableIdentityIterator imp
this.dataStart = dataStart;
this.dataSize = dataSize;
this.expireBefore = (int)(System.currentTimeMillis() / 1000);
- this.fromRemote = fromRemote;
+ this.flag = flag;
this.validateColumns = checkData;
try
@@ -173,7 +174,7 @@ public class SSTableIdentityIterator imp
{
try
{
- IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, fromRemote, expireBefore);
+ IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, flag, expireBefore);
if (validateColumns)
column.validateFields(columnFamily.metadata());
return column;
@@ -228,7 +229,7 @@ public class SSTableIdentityIterator imp
assert inputWithTracker.getBytesRead() == headerSize();
ColumnFamily cf = columnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory(), false);
// since we already read column count, just pass that value and continue deserialization
- ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, fromRemote);
+ ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, flag);
if (validateColumns)
{
try
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Thu Nov 10 19:18:46 2011
@@ -34,6 +34,7 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.service.StorageService;
@@ -231,8 +232,9 @@ public class SSTableWriter extends SSTab
ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory());
for (int i = 0; i < columnCount; i++)
{
- // deserialize column with fromRemote false, in order to keep size of streamed column
- IColumn column = cf.getColumnSerializer().deserialize(in, false, Integer.MIN_VALUE);
+ // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the
+ // data size received, so we must reserialize the exact same data
+ IColumn column = cf.getColumnSerializer().deserialize(in, IColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE);
if (column instanceof CounterColumn)
{
column = ((CounterColumn) column).markDeltaToBeCleared();
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java Thu Nov 10 19:18:46 2011
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ColumnSerializer;
import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.io.IColumnSerializer;
/**
* Facade over a DataInput that contains IColumns in sorted order.
@@ -43,16 +44,16 @@ public class ColumnSortedMap implements
private final DataInput dis;
private final Comparator<ByteBuffer> comparator;
private final int length;
- private final boolean fromRemote;
+ private final IColumnSerializer.Flag flag;
private final int expireBefore;
- public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
+ public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
{
this.comparator = comparator;
this.serializer = serializer;
this.dis = dis;
this.length = length;
- this.fromRemote = fromRemote;
+ this.flag = flag;
this.expireBefore = expireBefore;
}
@@ -143,7 +144,7 @@ public class ColumnSortedMap implements
public Set<Map.Entry<ByteBuffer, IColumn>> entrySet()
{
- return new ColumnSet(serializer, dis, length, fromRemote, expireBefore);
+ return new ColumnSet(serializer, dis, length, flag, expireBefore);
}
}
@@ -152,15 +153,15 @@ class ColumnSet implements Set<Map.Entry
private final ColumnSerializer serializer;
private final DataInput dis;
private final int length;
- private boolean fromRemote;
+ private IColumnSerializer.Flag flag;
private final int expireBefore;
- public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
+ public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
{
this.serializer = serializer;
this.dis = dis;
this.length = length;
- this.fromRemote = fromRemote;
+ this.flag = flag;
this.expireBefore = expireBefore;
}
@@ -181,7 +182,7 @@ class ColumnSet implements Set<Map.Entry
public Iterator<Entry<ByteBuffer, IColumn>> iterator()
{
- return new ColumnIterator(serializer, dis, length, fromRemote, expireBefore);
+ return new ColumnIterator(serializer, dis, length, flag, expireBefore);
}
public Object[] toArray()
@@ -234,16 +235,16 @@ class ColumnIterator implements Iterator
private final ColumnSerializer serializer;
private final DataInput dis;
private final int length;
- private final boolean fromRemote;
+ private final IColumnSerializer.Flag flag;
private int count = 0;
private final int expireBefore;
- public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
+ public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
{
this.dis = dis;
this.serializer = serializer;
this.length = length;
- this.fromRemote = fromRemote;
+ this.flag = flag;
this.expireBefore = expireBefore;
}
@@ -252,7 +253,7 @@ class ColumnIterator implements Iterator
try
{
count++;
- return serializer.deserialize(dis, fromRemote, expireBefore);
+ return serializer.deserialize(dis, flag, expireBefore);
}
catch (IOException e)
{
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Thu Nov 10 19:18:46 2011
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.db.compaction.CompactionController;
import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
@@ -129,7 +130,8 @@ public class IncomingStreamReader
// need to update row cache
if (controller == null)
controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MIN_VALUE, true);
- SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
+ // Note: Because we won't just echo the columns, there is no need to use the PRESERVE_SIZE flag, contrarily to what appendFromStream does below
+ SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, IColumnSerializer.Flag.FROM_REMOTE);
PrecompactedRow row = new PrecompactedRow(controller, Collections.singletonList(iter));
// We don't expire anything so the row shouldn't be empty
assert !row.isEmpty();
Modified: cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/db/CounterColumnTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/db/CounterColumnTest.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/db/CounterColumnTest.java (original)
+++ cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/db/CounterColumnTest.java Thu Nov 10 19:18:46 2011
@@ -39,6 +39,7 @@ import org.apache.cassandra.Util;
import org.apache.cassandra.db.context.CounterContext;
import static org.apache.cassandra.db.context.CounterContext.ContextState;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.utils.Allocator;
import org.apache.cassandra.utils.HeapAllocator;
@@ -295,7 +296,7 @@ public class CounterColumnTest extends S
assert original.equals(deserialized);
bufIn = new ByteArrayInputStream(serialized, 0, serialized.length);
- CounterColumn deserializedOnRemote = (CounterColumn)Column.serializer().deserialize(new DataInputStream(bufIn), true);
+ CounterColumn deserializedOnRemote = (CounterColumn)Column.serializer().deserialize(new DataInputStream(bufIn), IColumnSerializer.Flag.FROM_REMOTE);
assert deserializedOnRemote.name().equals(original.name());
assert deserializedOnRemote.total() == original.total();
assert deserializedOnRemote.value().equals(cc.clearAllDelta(original.value()));
Modified: cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Thu Nov 10 19:18:46 2011
@@ -83,19 +83,13 @@ public class StreamingTransferTest exten
// transfer the first and last key
logger.debug("Transferring " + cfs.columnFamily);
- int[] offs = new int[]{1, 3};
- IPartitioner p = StorageService.getPartitioner();
- List<Range> ranges = new ArrayList<Range>();
- ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
- ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
- StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null);
- StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
- session.await();
+ transfer(table, sstable);
// confirm that a single SSTable was transferred and registered
assertEquals(1, cfs.getSSTables().size());
// and that the index and filter were properly recovered
+ int[] offs = new int[]{1, 3};
List<Row> rows = Util.getRangeSlice(cfs);
assertEquals(offs.length, rows.size());
for (int i = 0; i < offs.length; i++)
@@ -119,6 +113,17 @@ public class StreamingTransferTest exten
return keys;
}
+ private void transfer(Table table, SSTableReader sstable) throws Exception
+ {
+ IPartitioner p = StorageService.getPartitioner();
+ List<Range> ranges = new ArrayList<Range>();
+ ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
+ ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
+ StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null);
+ StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
+ session.await();
+ }
+
@Test
public void testTransferTable() throws Exception
{
@@ -222,6 +227,12 @@ public class StreamingTransferTest exten
.write(cleanedEntries);
SSTableReader streamed = cfs.getSSTables().iterator().next();
SSTableUtils.assertContentEquals(cleaned, streamed);
+
+ // Retransfer the file, making sure it is now idempotent (see CASSANDRA-3481)
+ cfs.clearUnsafe();
+ transfer(table, streamed);
+ SSTableReader restreamed = cfs.getSSTables().iterator().next();
+ SSTableUtils.assertContentEquals(streamed, restreamed);
}
@Test
@@ -320,7 +331,7 @@ public class StreamingTransferTest exten
assertEquals(entry.getKey(), rows.get(0).key);
}
}
-
+
public interface Mutator
{
public void mutate(String key, String col, long timestamp) throws Exception;