You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/08/01 14:41:24 UTC
[3/6] cassandra git commit: Fix incorrect [2.1 <- 3.0] serialization
of counter cells created in 2.0
Fix incorrect [2.1 <- 3.0] serialization of counter cells created in 2.0
Also fixes calculation of legacy counter update cells' serialized size.
patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for CASSANDRA-13691
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ba712897
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ba712897
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ba712897
Branch: refs/heads/trunk
Commit: ba71289778369e71d9abbdb93cb6b91ba67f9c85
Parents: 9dc896f
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Jul 13 14:47:18 2017 -0700
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 1 15:24:55 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 4 +-
.../apache/cassandra/cql3/UpdateParameters.java | 7 ++-
.../org/apache/cassandra/db/LegacyLayout.java | 23 ++++++---
.../cassandra/db/context/CounterContext.java | 52 ++++++++++++++------
.../cassandra/thrift/CassandraServer.java | 4 +-
.../org/apache/cassandra/utils/CounterId.java | 3 +-
.../db/context/CounterContextTest.java | 34 ++++++++++++-
7 files changed, 98 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3f5fe6..4038ac7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.15
+ * Fix incorrect [2.1 <- 3.0] serialization of counter cells created in 2.0 (CASSANDRA-13691)
* Fix invalid writetime for null cells (CASSANDRA-13711)
* Fix ALTER TABLE statement to atomically propagate changes to the table and its MVs (CASSANDRA-12952)
* Fixed ambiguous output of nodetool tablestats command (CASSANDRA-13722)
@@ -19,6 +20,7 @@
Merged from 2.1:
* Clone HeartBeatState when building gossip messages. Make its generation/version volatile (CASSANDRA-13700)
+
3.0.14
* Ensure int overflow doesn't occur when calculating large partition warning size (CASSANDRA-13172)
* Ensure consistent view of partition columns between coordinator and replica in ColumnFilter (CASSANDRA-13004)
@@ -45,6 +47,7 @@ Merged from 2.2:
* Nodes started with join_ring=False should be able to serve requests when authentication is enabled (CASSANDRA-11381)
* cqlsh COPY FROM: increment error count only for failures, not for attempts (CASSANDRA-13209)
+
3.0.13
* Make reading of range tombstones more reliable (CASSANDRA-12811)
* Fix startup problems due to schema tables not completely flushed (CASSANDRA-12213)
@@ -170,7 +173,6 @@ Merged from 2.1:
* cqlsh copy-from: sort user type fields in csv (CASSANDRA-12959)
-
3.0.10
* Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
* Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index d902dec..8ff5344 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -162,12 +162,15 @@ public class UpdateParameters
// "counter update", which is a temporary state until we run into 'CounterMutation.updateWithCurrentValue()'
// which does the read-before-write and sets the proper CounterId, clock and updated value.
//
- // We thus create a "fake" local shard here. The CounterId/clock used don't matter as this is just a temporary
+ // We thus create a "fake" local shard here. The clock used doesn't matter as this is just a temporary
// state that will be replaced when processing the mutation in CounterMutation, but the reason we use a 'local'
// shard is due to the merging rules: if a user includes multiple updates to the same counter in a batch, those
// multiple updates will be merged in the PartitionUpdate *before* they even reach CounterMutation. So we need
// such update to be added together, and that's what a local shard gives us.
- builder.addCell(BufferCell.live(metadata, column, timestamp, CounterContext.instance().createLocal(increment)));
+ //
+ // We set counterid to a special value to differentiate between regular pre-2.0 local shards from pre-2.1 era
+ // and "counter update" temporary state cells. Please see CounterContext.createUpdate() for further details.
+ builder.addCell(BufferCell.live(metadata, column, timestamp, CounterContext.instance().createUpdate(increment)));
}
public void setComplexDeletionTime(ColumnDefinition column)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 669fb1c..4f7bc22 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -511,12 +511,12 @@ public abstract class LegacyLayout
{
size += ByteBufferUtil.serializedSizeWithShortLength(cell.name.encode(partition.metadata()));
size += 1; // serialization flags
- if (cell.kind == LegacyLayout.LegacyCell.Kind.EXPIRING)
+ if (cell.isExpiring())
{
size += TypeSizes.sizeof(cell.ttl);
size += TypeSizes.sizeof(cell.localDeletionTime);
}
- else if (cell.kind == LegacyLayout.LegacyCell.Kind.DELETED)
+ else if (cell.isTombstone())
{
size += TypeSizes.sizeof(cell.timestamp);
// localDeletionTime replaces cell.value as the body
@@ -524,7 +524,14 @@ public abstract class LegacyLayout
size += TypeSizes.sizeof(cell.localDeletionTime);
continue;
}
- else if (cell.kind == LegacyLayout.LegacyCell.Kind.COUNTER)
+ else if (cell.isCounterUpdate())
+ {
+ size += TypeSizes.sizeof(cell.timestamp);
+ long count = CounterContext.instance().getLocalCount(cell.value);
+ size += ByteBufferUtil.serializedSizeWithLength(ByteBufferUtil.bytes(count));
+ continue;
+ }
+ else if (cell.isCounter())
{
size += TypeSizes.sizeof(Long.MIN_VALUE); // timestampOfLastDelete
}
@@ -1073,7 +1080,7 @@ public abstract class LegacyLayout
ByteBuffer value = ByteBufferUtil.readWithLength(in);
LegacyCellName name = decodeCellName(metadata, cellname, readAllAsDynamic);
return (mask & COUNTER_UPDATE_MASK) != 0
- ? new LegacyCell(LegacyCell.Kind.COUNTER, name, CounterContext.instance().createLocal(ByteBufferUtil.toLong(value)), ts, Cell.NO_DELETION_TIME, Cell.NO_TTL)
+ ? new LegacyCell(LegacyCell.Kind.COUNTER, name, CounterContext.instance().createUpdate(ByteBufferUtil.toLong(value)), ts, Cell.NO_DELETION_TIME, Cell.NO_TTL)
: ((mask & DELETION_MASK) == 0
? new LegacyCell(LegacyCell.Kind.REGULAR, name, value, ts, Cell.NO_DELETION_TIME, Cell.NO_TTL)
: new LegacyCell(LegacyCell.Kind.DELETED, name, ByteBufferUtil.EMPTY_BYTE_BUFFER, ts, ByteBufferUtil.toInt(value), Cell.NO_TTL));
@@ -1489,11 +1496,11 @@ public abstract class LegacyLayout
return new LegacyCell(Kind.DELETED, decodeCellName(metadata, superColumnName, name), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, nowInSec, LivenessInfo.NO_TTL);
}
- public static LegacyCell counter(CFMetaData metadata, ByteBuffer superColumnName, ByteBuffer name, long value)
+ public static LegacyCell counterUpdate(CFMetaData metadata, ByteBuffer superColumnName, ByteBuffer name, long value)
throws UnknownColumnException
{
// See UpdateParameters.addCounter() for more details on this
- ByteBuffer counterValue = CounterContext.instance().createLocal(value);
+ ByteBuffer counterValue = CounterContext.instance().createUpdate(value);
return counter(decodeCellName(metadata, superColumnName, name), counterValue);
}
@@ -1515,10 +1522,10 @@ public abstract class LegacyLayout
return 0;
}
- private boolean isCounterUpdate()
+ public boolean isCounterUpdate()
{
// See UpdateParameters.addCounter() for more details on this
- return isCounter() && CounterContext.instance().isLocal(value);
+ return isCounter() && CounterContext.instance().isUpdate(value);
}
public ClusteringPrefix clustering()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/src/java/org/apache/cassandra/db/context/CounterContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java
index e3f4dfd..29e5cfc 100644
--- a/src/java/org/apache/cassandra/db/context/CounterContext.java
+++ b/src/java/org/apache/cassandra/db/context/CounterContext.java
@@ -81,6 +81,14 @@ public class CounterContext
private static final int COUNT_LENGTH = TypeSizes.sizeof(Long.MAX_VALUE);
private static final int STEP_LENGTH = CounterId.LENGTH + CLOCK_LENGTH + COUNT_LENGTH;
+ /*
+ * A special hard-coded value we use for clock ids to differentiate between regular local shards
+ * and 'fake' local shards used to emulate pre-3.0 CounterUpdateCell-s in UpdateParameters.
+ *
+ * Important for handling counter writes and reads during rolling 2.1/2.2 -> 3.0 upgrades.
+ */
+ static final CounterId UPDATE_CLOCK_ID = CounterId.fromInt(0);
+
private static final Logger logger = LoggerFactory.getLogger(CounterContext.class);
public static enum Relationship
@@ -100,6 +108,35 @@ public class CounterContext
}
/**
+ * Creates a counter context with a single local shard with clock id of UPDATE_CLOCK_ID.
+ *
+ * This is only used in a PartitionUpdate until the update has gone through
+ * CounterMutation.apply(), at which point this special local shard will be replaced by a regular global one.
+ * It should never hit commitlog / memtable / disk, but can hit network.
+ *
+ * We use this so that if an update statement has multiple increments of the same counter we properly
+ * add them rather than keeping only one of them.
+ *
+ * NOTE: Before CASSANDRA-13691 we used a regular local shard without a hard-coded clock id value here.
+ * It was problematic, because it was possible to return a false positive, and on read path encode an old counter
+ * cell from 2.0 era with a regular local shard as a counter update, and to break the 2.1 coordinator.
+ */
+ public ByteBuffer createUpdate(long count)
+ {
+ ContextState state = ContextState.allocate(0, 1, 0);
+ state.writeLocal(UPDATE_CLOCK_ID, 1L, count);
+ return state.context;
+ }
+
+ /**
+ * Checks if a context is an update (see createUpdate() for justification).
+ */
+ public boolean isUpdate(ByteBuffer context)
+ {
+ return ContextState.wrap(context).getCounterId().equals(UPDATE_CLOCK_ID);
+ }
+
+ /**
* Creates a counter context with a single global, 2.1+ shard (a result of increment).
*/
public ByteBuffer createGlobal(CounterId id, long clock, long count)
@@ -111,12 +148,7 @@ public class CounterContext
/**
* Creates a counter context with a single local shard.
- * This is only used in a PartitionUpdate until the update has gone through
- * CounterMutation.apply(), at which point all the local shard are replaced by
- * global ones. In other words, local shards should never hit the disk or
- * memtables. And we use this so that if an update statement has multiple increment
- * of the same counter we properly add them rather than keeping only one of them.
- * (this is also used for tests of compatibility with pre-2.1 counters)
+ * For use by tests of compatibility with pre-2.1 counters only.
*/
public ByteBuffer createLocal(long count)
{
@@ -682,14 +714,6 @@ public class CounterContext
}
/**
- * Checks if a context is local
- */
- public boolean isLocal(ByteBuffer context)
- {
- return ContextState.wrap(context).isLocal();
- }
-
- /**
* Returns the clock and the count associated with the given counter id, or (0, 0) if no such shard is present.
*/
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 0dec94e..86caac3 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1016,7 +1016,7 @@ public class CassandraServer implements Cassandra.Iface
private LegacyLayout.LegacyCell toCounterLegacyCell(CFMetaData metadata, ByteBuffer superColumnName, CounterColumn column)
throws UnknownColumnException
{
- return LegacyLayout.LegacyCell.counter(metadata, superColumnName, column.name, column.value);
+ return LegacyLayout.LegacyCell.counterUpdate(metadata, superColumnName, column.name, column.value);
}
private void sortAndMerge(CFMetaData metadata, List<LegacyLayout.LegacyCell> cells, int nowInSec)
@@ -2169,7 +2169,7 @@ public class CassandraServer implements Cassandra.Iface
LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name);
// See UpdateParameters.addCounter() for more details on this
- ByteBuffer value = CounterContext.instance().createLocal(column.value);
+ ByteBuffer value = CounterContext.instance().createUpdate(column.value);
CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
Cell cell = BufferCell.live(metadata, name.column, FBUtilities.timestampMicros(), value, path);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/src/java/org/apache/cassandra/utils/CounterId.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CounterId.java b/src/java/org/apache/cassandra/utils/CounterId.java
index 2552178..690d4aa 100644
--- a/src/java/org/apache/cassandra/utils/CounterId.java
+++ b/src/java/org/apache/cassandra/utils/CounterId.java
@@ -46,10 +46,11 @@ public class CounterId implements Comparable<CounterId>
}
/**
- * Function for test purposes, do not use otherwise.
* Pack an int in a valid CounterId so that the resulting ids respects the
* numerical ordering. Used for creating handcrafted but easy to
* understand contexts in unit tests (see CounterContextTest).
+ *
+ * Also used to generate a special ID for special-case update contexts (see CounterContext.createUpdate()).
*/
public static CounterId fromInt(int n)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ba712897/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
index 4f587c6..a8852f7 100644
--- a/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
+++ b/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
@@ -28,17 +28,19 @@ import org.junit.Test;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ClockAndCount;
+import org.apache.cassandra.db.LegacyLayout.LegacyCell;
import org.apache.cassandra.db.context.CounterContext.Relationship;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;
-import static org.apache.cassandra.db.context.CounterContext.ContextState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.apache.cassandra.db.context.CounterContext.ContextState;
+
public class CounterContextTest
{
private static final CounterContext cc = new CounterContext();
@@ -542,4 +544,34 @@ public class CounterContextTest
assertEquals(ClockAndCount.create(0L, 0L), cc.getClockAndCountOf(state.context, CounterId.fromInt(15)));
assertEquals(ClockAndCount.create(0L, 0L), cc.getClockAndCountOf(state.context, CounterId.fromInt(20)));
}
+
+ @Test // see CASSANDRA-13691
+ public void testCounterUpdate()
+ {
+ /*
+ * a context with just one 'update' shard - a local shard with a hardcoded value of CounterContext.UPDATE_CLOCK_ID
+ */
+
+ ByteBuffer updateContext = CounterContext.instance().createUpdate(10L);
+
+ assertEquals(ClockAndCount.create(1L, 10L), cc.getClockAndCountOf(updateContext, CounterContext.UPDATE_CLOCK_ID));
+ assertTrue(cc.isUpdate(updateContext));
+ LegacyCell updateCell = LegacyCell.counter(null, updateContext);
+ assertTrue(updateCell.isCounterUpdate());
+
+
+ /*
+ * a context with a regular local shard sorting first and a couple others in it - should *not* be identified as an update
+ */
+
+ ContextState notUpdateContextState = ContextState.allocate(1, 1, 1);
+ notUpdateContextState.writeLocal( CounterId.fromInt(1), 1L, 10L);
+ notUpdateContextState.writeRemote(CounterId.fromInt(2), 1L, 10L);
+ notUpdateContextState.writeGlobal(CounterId.fromInt(3), 1L, 10L);
+ ByteBuffer notUpdateContext = notUpdateContextState.context;
+
+ assertFalse(cc.isUpdate(notUpdateContext));
+ LegacyCell notUpdateCell = LegacyCell.counter(null, notUpdateContext);
+ assertFalse(notUpdateCell.isCounterUpdate());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org