You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/11/10 20:18:47 UTC

svn commit: r1200483 - in /cassandra/branches/cassandra-1.0: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/io/u...

Author: slebresne
Date: Thu Nov 10 19:18:46 2011
New Revision: 1200483

URL: http://svn.apache.org/viewvc?rev=1200483&view=rev
Log:
Fix incorrect size exception during streaming of counters
patch by slebresne; reviewed by jbellis for CASSANDRA-3481

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnSerializer.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterColumn.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ExpiringColumn.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SuperColumn.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/IColumnSerializer.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/db/CounterColumnTest.java
    cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Thu Nov 10 19:18:46 2011
@@ -8,6 +8,7 @@
  * fix reading metadata/statistics component for version < h (CASSANDRA-3474)
  * add sstable forward-compatibility (CASSANDRA-3478)
  * report compression ratio in CFSMBean (CASSANDRA-3393)
+ * fix incorrect size exception during streaming of counters (CASSANDRA-3481)
 Merged from 0.8:
  * Make counter shard merging thread safe (CASSANDRA-3178)
  * fix updating CF row_cache_provider (CASSANDRA-3414)

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Thu Nov 10 19:18:46 2011
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.SSTableMetadata;
 
@@ -114,10 +115,10 @@ public class ColumnFamilySerializer impl
 
     public ColumnFamily deserialize(DataInput dis) throws IOException
     {
-        return deserialize(dis, false, ThreadSafeSortedColumns.factory());
+        return deserialize(dis, IColumnSerializer.Flag.LOCAL, ThreadSafeSortedColumns.factory());
     }
 
-    public ColumnFamily deserialize(DataInput dis, boolean fromRemote, ISortedColumns.Factory factory) throws IOException
+    public ColumnFamily deserialize(DataInput dis, IColumnSerializer.Flag flag, ISortedColumns.Factory factory) throws IOException
     {
         if (!dis.readBoolean())
             return null;
@@ -128,22 +129,22 @@ public class ColumnFamilySerializer impl
             throw new UnserializableColumnFamilyException("Couldn't find cfId=" + cfId, cfId);
         ColumnFamily cf = ColumnFamily.create(cfId, factory);
         deserializeFromSSTableNoColumns(cf, dis);
-        deserializeColumns(dis, cf, fromRemote);
+        deserializeColumns(dis, cf, flag);
         return cf;
     }
 
-    public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean fromRemote) throws IOException
+    public void deserializeColumns(DataInput dis, ColumnFamily cf, IColumnSerializer.Flag flag) throws IOException
     {
         int size = dis.readInt();
-        deserializeColumns(dis, cf, size, fromRemote);
+        deserializeColumns(dis, cf, size, flag);
     }
 
     /* column count is already read from DataInput */
-    public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, boolean fromRemote) throws IOException
+    public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, IColumnSerializer.Flag flag) throws IOException
     {
         for (int i = 0; i < size; ++i)
         {
-            IColumn column = cf.getColumnSerializer().deserialize(dis, fromRemote, (int) (System.currentTimeMillis() / 1000));
+            IColumn column = cf.getColumnSerializer().deserialize(dis, flag, (int) (System.currentTimeMillis() / 1000));
             cf.addColumn(column);
         }
     }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnSerializer.java Thu Nov 10 19:18:46 2011
@@ -70,7 +70,7 @@ public class ColumnSerializer implements
 
     public Column deserialize(DataInput dis) throws IOException
     {
-        return deserialize(dis, false);
+        return deserialize(dis, Flag.LOCAL);
     }
 
     /*
@@ -78,12 +78,12 @@ public class ColumnSerializer implements
      * deserialize comes from a remote host. If it does, then we must clear
      * the delta.
      */
-    public Column deserialize(DataInput dis, boolean fromRemote) throws IOException
+    public Column deserialize(DataInput dis, IColumnSerializer.Flag flag) throws IOException
     {
-        return deserialize(dis, fromRemote, (int) (System.currentTimeMillis() / 1000));
+        return deserialize(dis, flag, (int) (System.currentTimeMillis() / 1000));
     }
 
-    public Column deserialize(DataInput dis, boolean fromRemote, int expireBefore) throws IOException
+    public Column deserialize(DataInput dis, IColumnSerializer.Flag flag, int expireBefore) throws IOException
     {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         if (name.remaining() <= 0)
@@ -104,7 +104,7 @@ public class ColumnSerializer implements
             long timestampOfLastDelete = dis.readLong();
             long ts = dis.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(dis);
-            return CounterColumn.create(name, value, ts, timestampOfLastDelete, fromRemote);
+            return CounterColumn.create(name, value, ts, timestampOfLastDelete, flag);
         }
         else if ((b & EXPIRATION_MASK) != 0)
         {
@@ -112,7 +112,7 @@ public class ColumnSerializer implements
             int expiration = dis.readInt();
             long ts = dis.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(dis);
-            return ExpiringColumn.create(name, value, ts, ttl, expiration, expireBefore);
+            return ExpiringColumn.create(name, value, ts, ttl, expiration, expireBefore, flag);
         }
         else
         {

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterColumn.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterColumn.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterColumn.java Thu Nov 10 19:18:46 2011
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.context.C
 import org.apache.cassandra.db.context.IContext.ContextRelationship;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.service.IWriteResponseHandler;
@@ -76,11 +77,11 @@ public class CounterColumn extends Colum
         this.timestampOfLastDelete = timestampOfLastDelete;
     }
 
-    public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, boolean fromRemote)
+    public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, IColumnSerializer.Flag flag)
     {
         // #elt being negative means we have to clean delta
         short count = value.getShort(value.position());
-        if (fromRemote || count < 0)
+        if (flag == IColumnSerializer.Flag.FROM_REMOTE || (flag == IColumnSerializer.Flag.LOCAL && count < 0))
             value = CounterContext.instance().clearAllDelta(value);
         return new CounterColumn(name, value, timestamp, timestampOfLastDelete);
     }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ExpiringColumn.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ExpiringColumn.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ExpiringColumn.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ExpiringColumn.java Thu Nov 10 19:18:46 2011
@@ -25,6 +25,7 @@ import java.security.MessageDigest;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -60,9 +61,9 @@ public class ExpiringColumn extends Colu
     }
 
     /** @return Either a DeletedColumn, or an ExpiringColumn. */
-    public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore)
+    public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, IColumnSerializer.Flag flag)
     {
-        if (localExpirationTime >= expireBefore)
+        if (localExpirationTime >= expireBefore || flag == IColumnSerializer.Flag.PRESERVE_SIZE)
             return new ExpiringColumn(name, value, timestamp, timeToLive, localExpirationTime);
         // the column is now expired, we can safely return a simple tombstone
         return new DeletedColumn(name, localExpirationTime, timestamp);

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java Thu Nov 10 19:18:46 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -105,7 +106,7 @@ class ReadResponseSerializer implements 
         if (!isDigest)
         {
             // This is coming from a remote host
-            row = Row.serializer().deserialize(dis, version, true, ArrayBackedSortedColumns.factory());
+            row = Row.serializer().deserialize(dis, version, IColumnSerializer.Flag.FROM_REMOTE, ArrayBackedSortedColumns.factory());
         }
 
         return isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row);

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java Thu Nov 10 19:18:46 2011
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 
 import java.io.*;
 
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -61,15 +62,15 @@ public class Row
             ColumnFamily.serializer().serialize(row.cf, dos);
         }
 
-        public Row deserialize(DataInput dis, int version, boolean fromRemote, ISortedColumns.Factory factory) throws IOException
+        public Row deserialize(DataInput dis, int version, IColumnSerializer.Flag flag, ISortedColumns.Factory factory) throws IOException
         {
             return new Row(StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(dis)),
-                           ColumnFamily.serializer().deserialize(dis, fromRemote, factory));
+                           ColumnFamily.serializer().deserialize(dis, flag, factory));
         }
 
         public Row deserialize(DataInput dis, int version) throws IOException
         {
-            return deserialize(dis, version, false, ThreadSafeSortedColumns.factory());
+            return deserialize(dis, version, IColumnSerializer.Flag.LOCAL, ThreadSafeSortedColumns.factory());
         }
 
         public long serializedSize(Row row, int version)

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java Thu Nov 10 19:18:46 2011
@@ -31,6 +31,7 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.Message;
@@ -388,7 +389,7 @@ public class RowMutation implements IMut
             }
         }
 
-        public RowMutation deserialize(DataInput dis, int version, boolean fromRemote) throws IOException
+        public RowMutation deserialize(DataInput dis, int version, IColumnSerializer.Flag flag) throws IOException
         {
             String table = dis.readUTF();
             ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
@@ -397,7 +398,7 @@ public class RowMutation implements IMut
             for (int i = 0; i < size; ++i)
             {
                 Integer cfid = Integer.valueOf(dis.readInt());
-                ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, fromRemote, ThreadSafeSortedColumns.factory());
+                ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, flag, ThreadSafeSortedColumns.factory());
                 modifications.put(cfid, cf);
             }
             return new RowMutation(table, key, modifications);
@@ -405,7 +406,7 @@ public class RowMutation implements IMut
 
         public RowMutation deserialize(DataInput dis, int version) throws IOException
         {
-            return deserialize(dis, version, true);
+            return deserialize(dis, version, IColumnSerializer.Flag.FROM_REMOTE);
         }
 
         public long serializedSize(RowMutation rm, int version)

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SuperColumn.java Thu Nov 10 19:18:46 2011
@@ -336,15 +336,15 @@ class SuperColumnSerializer implements I
 
     public IColumn deserialize(DataInput dis) throws IOException
     {
-        return deserialize(dis, false);
+        return deserialize(dis, IColumnSerializer.Flag.LOCAL);
     }
 
-    public IColumn deserialize(DataInput dis, boolean fromRemote) throws IOException
+    public IColumn deserialize(DataInput dis, IColumnSerializer.Flag flag) throws IOException
     {
-        return deserialize(dis, fromRemote, (int)(System.currentTimeMillis() / 1000));
+        return deserialize(dis, flag, (int)(System.currentTimeMillis() / 1000));
     }
 
-    public IColumn deserialize(DataInput dis, boolean fromRemote, int expireBefore) throws IOException
+    public IColumn deserialize(DataInput dis, IColumnSerializer.Flag flag, int expireBefore) throws IOException
     {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         int localDeleteTime = dis.readInt();
@@ -357,7 +357,7 @@ class SuperColumnSerializer implements I
         /* read the number of columns */
         int size = dis.readInt();
         ColumnSerializer serializer = Column.serializer();
-        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, fromRemote, expireBefore);
+        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, flag, expireBefore);
         SuperColumn superColumn = new SuperColumn(name, ThreadSafeSortedColumns.factory().fromSorted(preSortedMap, false));
         superColumn.delete(localDeleteTime, markedForDeleteAt);
         return superColumn;

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Thu Nov 10 19:18:46 2011
@@ -42,6 +42,7 @@ import org.apache.cassandra.concurrent.S
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -281,7 +282,7 @@ public class CommitLog implements Commit
                     {
                         // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
                         // the current version.  so do make sure the CL is drained prior to upgrading a node.
-                        rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, false);
+                        rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, IColumnSerializer.Flag.LOCAL);
                     }
                     catch (UnserializableColumnFamilyException ex)
                     {

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/IColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/IColumnSerializer.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/IColumnSerializer.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/IColumnSerializer.java Thu Nov 10 19:18:46 2011
@@ -27,5 +27,21 @@ import org.apache.cassandra.db.IColumn;
 
 public interface IColumnSerializer extends ISerializer<IColumn>
 {
-    public IColumn deserialize(DataInput in, boolean fromRemote, int expireBefore) throws IOException;
+    /**
+     * Flag affecting deserialization behavior.
+     *  - LOCAL: for deserialization of local data (Expired columns are
+     *      converted to tombstones (to gain disk space)).
+     *  - FROM_REMOTE: for deserialization of data received from remote hosts
+     *      (Expired columns are converted to tombstone and counters have
+     *      their delta cleared)
+     *  - PRESERVE_SIZE: used when no transformation must be performed, i.e,
+     *      when we must ensure that deserializing and reserializing the
+     *      result yield the exact same bytes. Streaming uses this.
+     */
+    public static enum Flag
+    {
+        LOCAL, FROM_REMOTE, PRESERVE_SIZE;
+    }
+
+    public IColumn deserialize(DataInput in, Flag flag, int expireBefore) throws IOException;
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Thu Nov 10 19:18:46 2011
@@ -30,6 +30,7 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
 import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.BytesReadTracker;
 
@@ -41,7 +42,7 @@ public class SSTableIdentityIterator imp
     private final DataInput input;
     private final long dataStart;
     public final long dataSize;
-    public final boolean fromRemote;
+    public final IColumnSerializer.Flag flag;
 
     private final ColumnFamily columnFamily;
     private final int columnCount;
@@ -82,17 +83,17 @@ public class SSTableIdentityIterator imp
     public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey<?> key, long dataStart, long dataSize, boolean checkData)
     throws IOException
     {
-        this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, false);
+        this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, IColumnSerializer.Flag.LOCAL);
     }
 
-    public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey<?> key, long dataStart, long dataSize, boolean fromRemote)
+    public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey<?> key, long dataStart, long dataSize, IColumnSerializer.Flag flag)
     throws IOException
     {
-        this(metadata, file, key, dataStart, dataSize, false, null, fromRemote);
+        this(metadata, file, key, dataStart, dataSize, false, null, flag);
     }
 
     // sstable may be null *if* deserializeRowHeader is false
-    private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey<?> key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
+    private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey<?> key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, IColumnSerializer.Flag flag)
     throws IOException
     {
         this.input = input;
@@ -101,7 +102,7 @@ public class SSTableIdentityIterator imp
         this.dataStart = dataStart;
         this.dataSize = dataSize;
         this.expireBefore = (int)(System.currentTimeMillis() / 1000);
-        this.fromRemote = fromRemote;
+        this.flag = flag;
         this.validateColumns = checkData;
 
         try
@@ -173,7 +174,7 @@ public class SSTableIdentityIterator imp
     {
         try
         {
-            IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, fromRemote, expireBefore);
+            IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, flag, expireBefore);
             if (validateColumns)
                 column.validateFields(columnFamily.metadata());
             return column;
@@ -228,7 +229,7 @@ public class SSTableIdentityIterator imp
         assert inputWithTracker.getBytesRead() == headerSize();
         ColumnFamily cf = columnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory(), false);
         // since we already read column count, just pass that value and continue deserialization
-        ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, fromRemote);
+        ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, flag);
         if (validateColumns)
         {
             try

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Thu Nov 10 19:18:46 2011
@@ -34,6 +34,7 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.StorageService;
@@ -231,8 +232,9 @@ public class SSTableWriter extends SSTab
         ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory());
         for (int i = 0; i < columnCount; i++)
         {
-            // deserialize column with fromRemote false, in order to keep size of streamed column
-            IColumn column = cf.getColumnSerializer().deserialize(in, false, Integer.MIN_VALUE);
+            // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the
+            // data size received, so we must reserialize the exact same data
+            IColumn column = cf.getColumnSerializer().deserialize(in, IColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE);
             if (column instanceof CounterColumn)
             {
                 column = ((CounterColumn) column).markDeltaToBeCleared();

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java Thu Nov 10 19:18:46 2011
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ColumnSerializer;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.io.IColumnSerializer;
 
 /**
  * Facade over a DataInput that contains IColumns in sorted order.
@@ -43,16 +44,16 @@ public class ColumnSortedMap implements 
     private final DataInput dis;
     private final Comparator<ByteBuffer> comparator;
     private final int length;
-    private final boolean fromRemote;
+    private final IColumnSerializer.Flag flag;
     private final int expireBefore;
 
-    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
+    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
     {
         this.comparator = comparator;
         this.serializer = serializer;
         this.dis = dis;
         this.length = length;
-        this.fromRemote = fromRemote;
+        this.flag = flag;
         this.expireBefore = expireBefore;
     }
 
@@ -143,7 +144,7 @@ public class ColumnSortedMap implements 
 
     public Set<Map.Entry<ByteBuffer, IColumn>> entrySet()
     {
-        return new ColumnSet(serializer, dis, length, fromRemote, expireBefore);
+        return new ColumnSet(serializer, dis, length, flag, expireBefore);
     }
 }
 
@@ -152,15 +153,15 @@ class ColumnSet implements Set<Map.Entry
     private final ColumnSerializer serializer;
     private final DataInput dis;
     private final int length;
-    private boolean fromRemote;
+    private IColumnSerializer.Flag flag;
     private final int expireBefore;
 
-    public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
+    public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
     {
         this.serializer = serializer;
         this.dis = dis;
         this.length = length;
-        this.fromRemote = fromRemote;
+        this.flag = flag;
         this.expireBefore = expireBefore;
     }
 
@@ -181,7 +182,7 @@ class ColumnSet implements Set<Map.Entry
 
     public Iterator<Entry<ByteBuffer, IColumn>> iterator()
     {
-        return new ColumnIterator(serializer, dis, length, fromRemote, expireBefore);
+        return new ColumnIterator(serializer, dis, length, flag, expireBefore);
     }
 
     public Object[] toArray()
@@ -234,16 +235,16 @@ class ColumnIterator implements Iterator
     private final ColumnSerializer serializer;
     private final DataInput dis;
     private final int length;
-    private final boolean fromRemote;
+    private final IColumnSerializer.Flag flag;
     private int count = 0;
     private final int expireBefore;
 
-    public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
+    public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
     {
         this.dis = dis;
         this.serializer = serializer;
         this.length = length;
-        this.fromRemote = fromRemote;
+        this.flag = flag;
         this.expireBefore = expireBefore;
     }
 
@@ -252,7 +253,7 @@ class ColumnIterator implements Iterator
         try
         {
             count++;
-            return serializer.deserialize(dis, fromRemote, expireBefore);
+            return serializer.deserialize(dis, flag, expireBefore);
         }
         catch (IOException e)
         {

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Thu Nov 10 19:18:46 2011
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
@@ -129,7 +130,8 @@ public class IncomingStreamReader
                         // need to update row cache
                         if (controller == null)
                             controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MIN_VALUE, true);
-                        SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
+                        // Note: Because we won't just echo the columns, there is no need to use the PRESERVE_SIZE flag, contrarily to what appendFromStream does below
+                        SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, IColumnSerializer.Flag.FROM_REMOTE);
                         PrecompactedRow row = new PrecompactedRow(controller, Collections.singletonList(iter));
                         // We don't expire anything so the row shouldn't be empty
                         assert !row.isEmpty();

Modified: cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/db/CounterColumnTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/db/CounterColumnTest.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/db/CounterColumnTest.java (original)
+++ cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/db/CounterColumnTest.java Thu Nov 10 19:18:46 2011
@@ -39,6 +39,7 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.db.context.CounterContext;
 import static org.apache.cassandra.db.context.CounterContext.ContextState;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.HeapAllocator;
@@ -295,7 +296,7 @@ public class CounterColumnTest extends S
         assert original.equals(deserialized);
 
         bufIn = new ByteArrayInputStream(serialized, 0, serialized.length);
-        CounterColumn deserializedOnRemote = (CounterColumn)Column.serializer().deserialize(new DataInputStream(bufIn), true);
+        CounterColumn deserializedOnRemote = (CounterColumn)Column.serializer().deserialize(new DataInputStream(bufIn), IColumnSerializer.Flag.FROM_REMOTE);
         assert deserializedOnRemote.name().equals(original.name());
         assert deserializedOnRemote.total() == original.total();
         assert deserializedOnRemote.value().equals(cc.clearAllDelta(original.value()));

Modified: cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1200483&r1=1200482&r2=1200483&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Thu Nov 10 19:18:46 2011
@@ -83,19 +83,13 @@ public class StreamingTransferTest exten
 
         // transfer the first and last key
         logger.debug("Transferring " + cfs.columnFamily);
-        int[] offs = new int[]{1, 3};
-        IPartitioner p = StorageService.getPartitioner();
-        List<Range> ranges = new ArrayList<Range>();
-        ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
-        ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
-        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null);
-        StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
-        session.await();
+        transfer(table, sstable);
 
         // confirm that a single SSTable was transferred and registered
         assertEquals(1, cfs.getSSTables().size());
 
         // and that the index and filter were properly recovered
+        int[] offs = new int[]{1, 3};
         List<Row> rows = Util.getRangeSlice(cfs);
         assertEquals(offs.length, rows.size());
         for (int i = 0; i < offs.length; i++)
@@ -119,6 +113,17 @@ public class StreamingTransferTest exten
         return keys;
     }
 
+    private void transfer(Table table, SSTableReader sstable) throws Exception
+    {
+        IPartitioner p = StorageService.getPartitioner();
+        List<Range> ranges = new ArrayList<Range>();
+        ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
+        ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
+        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null);
+        StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
+        session.await();
+    }
+
     @Test
     public void testTransferTable() throws Exception
     {
@@ -222,6 +227,12 @@ public class StreamingTransferTest exten
             .write(cleanedEntries);
         SSTableReader streamed = cfs.getSSTables().iterator().next();
         SSTableUtils.assertContentEquals(cleaned, streamed);
+
+        // Retransfer the file, making sure it is now idempotent (see CASSANDRA-3481)
+        cfs.clearUnsafe();
+        transfer(table, streamed);
+        SSTableReader restreamed = cfs.getSSTables().iterator().next();
+        SSTableUtils.assertContentEquals(streamed, restreamed);
     }
 
     @Test
@@ -320,7 +331,7 @@ public class StreamingTransferTest exten
             assertEquals(entry.getKey(), rows.get(0).key);
         }
     }
- 
+
     public interface Mutator
     {
         public void mutate(String key, String col, long timestamp) throws Exception;