You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/01/09 07:12:08 UTC
[1/2] cassandra git commit: Make PartitionUpdate and Mutation
immutable
Repository: cassandra
Updated Branches:
refs/heads/trunk 45c7c4561 -> de7c24b39
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index a663051..530a03b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.schema.TableMetadataRef;
class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
{
protected DecoratedKey currentKey;
- protected PartitionUpdate update;
+ protected PartitionUpdate.Builder update;
private SSTableTxnWriter writer;
@@ -56,7 +56,7 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
return writer;
}
- PartitionUpdate getUpdateFor(DecoratedKey key) throws IOException
+ PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException
{
assert key != null;
@@ -65,9 +65,9 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
if (!key.equals(currentKey))
{
if (update != null)
- writePartition(update);
+ writePartition(update.build());
currentKey = key;
- update = new PartitionUpdate(metadata.get(), currentKey, columns, 4);
+ update = new PartitionUpdate.Builder(metadata.get(), currentKey, columns, 4);
}
assert update != null;
@@ -79,7 +79,7 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
try
{
if (update != null)
- writePartition(update);
+ writePartition(update.build());
if (writer != null)
writer.finish(false);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index cd03a40..9d86feb 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import com.google.common.collect.*;
import com.google.common.collect.Maps;
@@ -356,15 +357,15 @@ public final class SchemaKeyspace
static Collection<Mutation> convertSchemaToMutations()
{
- Map<DecoratedKey, Mutation> mutationMap = new HashMap<>();
+ Map<DecoratedKey, Mutation.PartitionUpdateCollector> mutationMap = new HashMap<>();
for (String table : ALL)
convertSchemaToMutations(mutationMap, table);
- return mutationMap.values();
+ return mutationMap.values().stream().map(Mutation.PartitionUpdateCollector::build).collect(Collectors.toList());
}
- private static void convertSchemaToMutations(Map<DecoratedKey, Mutation> mutationMap, String schemaTableName)
+ private static void convertSchemaToMutations(Map<DecoratedKey, Mutation.PartitionUpdateCollector> mutationMap, String schemaTableName)
{
ReadCommand cmd = getReadCommandForTableSchema(schemaTableName);
try (ReadExecutionController executionController = cmd.executionController();
@@ -378,9 +379,8 @@ public final class SchemaKeyspace
continue;
DecoratedKey key = partition.partitionKey();
- Mutation mutation = mutationMap.computeIfAbsent(key, k -> new Mutation(SchemaConstants.SCHEMA_KEYSPACE_NAME, key));
-
- mutation.add(makeUpdateForSchema(partition, cmd.columnFilter()));
+ Mutation.PartitionUpdateCollector puCollector = mutationMap.computeIfAbsent(key, k -> new Mutation.PartitionUpdateCollector(SchemaConstants.SCHEMA_KEYSPACE_NAME, key));
+ puCollector.add(makeUpdateForSchema(partition, cmd.columnFilter()));
}
}
}
@@ -423,7 +423,7 @@ public final class SchemaKeyspace
@SuppressWarnings("unchecked")
private static DecoratedKey decorate(TableMetadata metadata, Object value)
{
- return metadata.partitioner.decorateKey(((AbstractType)metadata.partitionKeyType).decompose(value));
+ return metadata.partitioner.decorateKey(((AbstractType) metadata.partitionKeyType).decompose(value));
}
static Mutation.SimpleBuilder makeCreateKeyspaceMutation(String name, KeyspaceParams params, long timestamp)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 933014f..54f4b0c 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -207,7 +207,7 @@ public class DataResolver extends ResponseResolver
private final DecoratedKey partitionKey;
private final RegularAndStaticColumns columns;
private final boolean isReversed;
- private final PartitionUpdate[] repairs = new PartitionUpdate[sources.length];
+ private final PartitionUpdate.Builder[] repairs = new PartitionUpdate.Builder[sources.length];
private final Row.Builder[] currentRows = new Row.Builder[sources.length];
private final RowDiffListener diffListener;
@@ -268,10 +268,10 @@ public class DataResolver extends ResponseResolver
};
}
- private PartitionUpdate update(int i)
+ private PartitionUpdate.Builder update(int i)
{
if (repairs[i] == null)
- repairs[i] = new PartitionUpdate(command.metadata(), partitionKey, columns, 1);
+ repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
return repairs[i];
}
@@ -468,7 +468,7 @@ public class DataResolver extends ResponseResolver
{
for (int i = 0; i < repairs.length; i++)
if (null != repairs[i])
- sendRepairMutation(repairs[i], sources[i]);
+ sendRepairMutation(repairs[i].build(), sources[i]);
}
private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 6d672f5..422eaa8 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -59,8 +59,8 @@ public class Commit
public static Commit newProposal(UUID ballot, PartitionUpdate update)
{
- update.updateAllTimestamp(UUIDGen.microsTimestamp(ballot));
- return new Commit(ballot, update);
+ PartitionUpdate withNewTimestamp = new PartitionUpdate.Builder(update, 0).updateAllTimestamp(UUIDGen.microsTimestamp(ballot)).build();
+ return new Commit(ballot, withNewTimestamp);
}
public static Commit emptyCommit(DecoratedKey key, TableMetadata metadata)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index e5c5727..b553a12 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -170,7 +170,9 @@ public class ColumnFamilyStoreTest
ByteBuffer val = ByteBufferUtil.bytes("val1");
// insert
- ColumnMetadata newCol = ColumnMetadata.regularColumn(cfs.metadata(), ByteBufferUtil.bytes("val2"), AsciiType.instance);
+ Mutation.SimpleBuilder builder = Mutation.simpleBuilder(keyspaceName, cfs.metadata().partitioner.decorateKey(ByteBufferUtil.bytes("val2")));
+ builder.update(cfName).row("Column1").add("val", "val1").build();
+
new RowUpdateBuilder(cfs.metadata(), 0, "key1").clustering("Column1").add("val", "val1").build().applyUnsafe();
new RowUpdateBuilder(cfs.metadata(), 0, "key2").clustering("Column1").add("val", "val1").build().applyUnsafe();
assertRangeCount(cfs, col, val, 2);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/CounterMutationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterMutationTest.java b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
index f36deea..9be0960 100644
--- a/test/unit/org/apache/cassandra/db/CounterMutationTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterMutationTest.java
@@ -118,20 +118,20 @@ public class CounterMutationTest
cfsTwo.truncateBlocking();
// Do the update (+1, -1), (+2, -2)
- Mutation batch = new Mutation(KEYSPACE1, Util.dk("key1"));
+ Mutation.PartitionUpdateCollector batch = new Mutation.PartitionUpdateCollector(KEYSPACE1, Util.dk("key1"));
batch.add(new RowUpdateBuilder(cfsOne.metadata(), 5, "key1")
.clustering("cc")
.add("val", 1L)
.add("val2", -1L)
- .build().get(cfsOne.metadata()));
+ .build().getPartitionUpdate(cfsOne.metadata()));
batch.add(new RowUpdateBuilder(cfsTwo.metadata(), 5, "key1")
.clustering("cc")
.add("val", 2L)
.add("val2", -2L)
- .build().get(cfsTwo.metadata()));
+ .build().getPartitionUpdate(cfsTwo.metadata()));
- new CounterMutation(batch, ConsistencyLevel.ONE).apply();
+ new CounterMutation(batch.build(), ConsistencyLevel.ONE).apply();
ColumnMetadata c1cfs1 = cfsOne.metadata().getColumn(ByteBufferUtil.bytes("val"));
ColumnMetadata c2cfs1 = cfsOne.metadata().getColumn(ByteBufferUtil.bytes("val2"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DeletePartitionTest.java b/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
index 95bb5a4..6ed43f7 100644
--- a/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
@@ -78,8 +78,9 @@ public class DeletePartitionTest
store.forceBlockingFlush();
// delete the partition
- new Mutation(KEYSPACE1, key)
+ new Mutation.PartitionUpdateCollector(KEYSPACE1, key)
.add(PartitionUpdate.fullPartitionDelete(store.metadata(), key, 0, FBUtilities.nowInSeconds()))
+ .build()
.applyUnsafe();
if (flushAfterRemove)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index 5134857..ca2765d 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -85,13 +85,13 @@ public class RowTest
@Test
public void testMergeRangeTombstones() throws InterruptedException
{
- PartitionUpdate update1 = new PartitionUpdate(metadata, dk, metadata.regularAndStaticColumns(), 1);
+ PartitionUpdate.Builder update1 = new PartitionUpdate.Builder(metadata, dk, metadata.regularAndStaticColumns(), 1);
writeRangeTombstone(update1, "1", "11", 123, 123);
writeRangeTombstone(update1, "2", "22", 123, 123);
writeRangeTombstone(update1, "3", "31", 123, 123);
writeRangeTombstone(update1, "4", "41", 123, 123);
- PartitionUpdate update2 = new PartitionUpdate(metadata, dk, metadata.regularAndStaticColumns(), 1);
+ PartitionUpdate.Builder update2 = new PartitionUpdate.Builder(metadata, dk, metadata.regularAndStaticColumns(), 1);
writeRangeTombstone(update2, "1", "11", 123, 123);
writeRangeTombstone(update2, "111", "112", 1230, 123);
writeRangeTombstone(update2, "2", "24", 123, 123);
@@ -99,7 +99,7 @@ public class RowTest
writeRangeTombstone(update2, "4", "41", 123, 1230);
writeRangeTombstone(update2, "5", "51", 123, 1230);
- try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(ImmutableList.of(update1.unfilteredIterator(), update2.unfilteredIterator()), nowInSeconds))
+ try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(ImmutableList.of(update1.build().unfilteredIterator(), update2.build().unfilteredIterator()), nowInSeconds))
{
Object[][] expected = new Object[][]{ { "1", "11", 123l, 123 },
{ "111", "112", 1230l, 123 },
@@ -204,7 +204,7 @@ public class RowTest
assertEquals(expected[3], deletionTime.localDeletionTime());
}
- public void writeRangeTombstone(PartitionUpdate update, Object start, Object end, long markedForDeleteAt, int localDeletionTime)
+ public void writeRangeTombstone(PartitionUpdate.Builder update, Object start, Object end, long markedForDeleteAt, int localDeletionTime)
{
ClusteringComparator comparator = cfs.getComparator();
update.add(new RangeTombstone(Slice.make(comparator.make(start), comparator.make(end)), new DeletionTime(markedForDeleteAt, localDeletionTime)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
index f4eafa3..706b274 100644
--- a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -109,26 +109,16 @@ public class RowUpdateBuilder
public PartitionUpdate buildUpdate()
{
- PartitionUpdate update = updateBuilder.build();
for (RangeTombstone rt : rts)
- update.add(rt);
- return update;
+ updateBuilder.addRangeTombstone(rt);
+ return updateBuilder.build();
}
- private static void deleteRow(PartitionUpdate update, long timestamp, int localDeletionTime, Object... clusteringValues)
+ private static void deleteRow(PartitionUpdate.Builder updateBuilder, long timestamp, int localDeletionTime, Object... clusteringValues)
{
- assert clusteringValues.length == update.metadata().comparator.size() || (clusteringValues.length == 0 && !update.columns().statics.isEmpty());
-
- boolean isStatic = clusteringValues.length != update.metadata().comparator.size();
- Row.Builder builder = BTreeRow.sortedBuilder();
-
- if (isStatic)
- builder.newRow(Clustering.STATIC_CLUSTERING);
- else
- builder.newRow(clusteringValues.length == 0 ? Clustering.EMPTY : update.metadata().comparator.make(clusteringValues));
- builder.addRowDeletion(Row.Deletion.regular(new DeletionTime(timestamp, localDeletionTime)));
-
- update.add(builder.build());
+ SimpleBuilders.RowBuilder b = new SimpleBuilders.RowBuilder(updateBuilder.metadata(), clusteringValues);
+ b.nowInSec(localDeletionTime).timestamp(timestamp).delete();
+ updateBuilder.add(b.build());
}
public static Mutation deleteRow(TableMetadata metadata, long timestamp, Object key, Object... clusteringValues)
@@ -138,11 +128,9 @@ public class RowUpdateBuilder
public static Mutation deleteRowAt(TableMetadata metadata, long timestamp, int localDeletionTime, Object key, Object... clusteringValues)
{
- PartitionUpdate update = new PartitionUpdate(metadata, makeKey(metadata, key), metadata.regularAndStaticColumns(), 0);
+ PartitionUpdate.Builder update = new PartitionUpdate.Builder(metadata, makeKey(metadata, key), metadata.regularAndStaticColumns(), 0);
deleteRow(update, timestamp, localDeletionTime, clusteringValues);
- // note that the created mutation may get further update later on, so we don't use the ctor that create a singletonMap
- // underneath (this class if for convenience, not performance)
- return new Mutation(update.metadata().keyspace, update.partitionKey()).add(update);
+ return new Mutation.PartitionUpdateCollector(update.metadata().keyspace, update.partitionKey()).add(update.build()).build();
}
private static DecoratedKey makeKey(TableMetadata metadata, Object... partitionKey)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java
index 6eaf2c8..ca76e45 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java
@@ -170,7 +170,7 @@ public class CommitLogReaderTest extends CQLTester
int j = 0;
while (i + j < handler.seenMutationCount())
{
- PartitionUpdate pu = handler.seenMutations.get(i + j).get(currentTableMetadata());
+ PartitionUpdate pu = handler.seenMutations.get(i + j).getPartitionUpdate(currentTableMetadata());
if (pu == null)
{
j++;
@@ -234,7 +234,7 @@ public class CommitLogReaderTest extends CQLTester
public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
{
- if ((metadata == null) || (metadata != null && m.get(metadata) != null)) {
+ if ((metadata == null) || (metadata != null && m.getPartitionUpdate(metadata) != null)) {
seenMutations.add(m);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 5f0b832..fc193e8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -193,8 +193,9 @@ public class CompactionsPurgeTest
}
cfs.forceBlockingFlush();
- new Mutation(KEYSPACE1, dk(key))
+ new Mutation.PartitionUpdateCollector(KEYSPACE1, dk(key))
.add(PartitionUpdate.fullPartitionDelete(cfs.metadata(), dk(key), Long.MAX_VALUE, FBUtilities.nowInSeconds()))
+ .build()
.applyUnsafe();
cfs.forceBlockingFlush();
@@ -423,9 +424,9 @@ public class CompactionsPurgeTest
}
// deletes partition
- Mutation rm = new Mutation(KEYSPACE_CACHED, dk(key));
+ Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KEYSPACE_CACHED, dk(key));
rm.add(PartitionUpdate.fullPartitionDelete(cfs.metadata(), dk(key), 1, FBUtilities.nowInSeconds()));
- rm.applyUnsafe();
+ rm.build().applyUnsafe();
// Adds another unrelated partition so that the sstable is not considered fully expired. We do not
// invalidate the row cache in that latter case.
@@ -463,9 +464,9 @@ public class CompactionsPurgeTest
}
// deletes partition with timestamp such that not all columns are deleted
- Mutation rm = new Mutation(KEYSPACE1, dk(key));
+ Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KEYSPACE1, dk(key));
rm.add(PartitionUpdate.fullPartitionDelete(cfs.metadata(), dk(key), 4, FBUtilities.nowInSeconds()));
- rm.applyUnsafe();
+ rm.build().applyUnsafe();
ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).build());
assertFalse(partition.partitionLevelDeletion().isLive());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java b/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
index 0f83dac..f887ede 100644
--- a/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
+++ b/test/unit/org/apache/cassandra/db/partition/PartitionUpdateTest.java
@@ -47,4 +47,19 @@ public class PartitionUpdateTest extends CQLTester
builder.newRow(1).add("a", 1);
Assert.assertEquals(2, builder.build().operationCount());
}
+
+ @Test
+ public void testUpdateAllTimestamp()
+ {
+ createTable("CREATE TABLE %s (key text, clustering int, a int, b int, c int, s int static, PRIMARY KEY(key, clustering))");
+ TableMetadata cfm = currentTableMetadata();
+
+ long timestamp = FBUtilities.timestampMicros();
+ RowUpdateBuilder rub = new RowUpdateBuilder(cfm, timestamp, "key0").clustering(1).add("a", 1);
+ PartitionUpdate pu = rub.buildUpdate();
+ PartitionUpdate pu2 = new PartitionUpdate.Builder(pu, 0).updateAllTimestamp(0).build();
+
+ Assert.assertTrue(pu.maxTimestamp() > 0);
+ Assert.assertTrue(pu2.maxTimestamp() == 0);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
index fb35ead..0d790fc 100644
--- a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
@@ -366,11 +366,11 @@ public class RowAndDeletionMergeIteratorTest
private Iterator<Row> createRowIterator()
{
- PartitionUpdate update = new PartitionUpdate(cfm, dk, cfm.regularAndStaticColumns(), 1);
+ PartitionUpdate.Builder update = new PartitionUpdate.Builder(cfm, dk, cfm.regularAndStaticColumns(), 1);
for (int i = 0; i < 5; i++)
addRow(update, i, i);
- return update.iterator();
+ return update.build().iterator();
}
private UnfilteredRowIterator createMergeIterator(Iterator<Row> rows, Iterator<RangeTombstone> tombstones, boolean reversed)
@@ -395,7 +395,7 @@ public class RowAndDeletionMergeIteratorTest
true);
}
- private void addRow(PartitionUpdate update, int col1, int a)
+ private void addRow(PartitionUpdate.Builder update, int col1, int a)
{
update.add(BTreeRow.singleCellRow(update.metadata().comparator.make(col1), makeCell(defA, a, 0)));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
index 406832a..071f84d 100644
--- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
@@ -767,7 +767,7 @@ public class SASIIndexTest
{
ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
- Mutation rm1 = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key1")));
+ Mutation.PartitionUpdateCollector rm1 = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey(AsciiType.instance.decompose("key1")));
rm1.add(PartitionUpdate.singleRowUpdate(store.metadata(),
rm1.key(),
buildRow(buildCell(store.metadata(),
@@ -775,7 +775,7 @@ public class SASIIndexTest
AsciiType.instance.decompose("jason"),
System.currentTimeMillis()))));
- Mutation rm2 = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key2")));
+ Mutation.PartitionUpdateCollector rm2 = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey(AsciiType.instance.decompose("key2")));
rm2.add(PartitionUpdate.singleRowUpdate(store.metadata(),
rm2.key(),
buildRow(buildCell(store.metadata(),
@@ -783,7 +783,7 @@ public class SASIIndexTest
AsciiType.instance.decompose("pavel"),
System.currentTimeMillis()))));
- Mutation rm3 = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key3")));
+ Mutation.PartitionUpdateCollector rm3 = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey(AsciiType.instance.decompose("key3")));
rm3.add(PartitionUpdate.singleRowUpdate(store.metadata(),
rm3.key(),
buildRow(buildCell(store.metadata(),
@@ -791,9 +791,9 @@ public class SASIIndexTest
AsciiType.instance.decompose("Aleksey"),
System.currentTimeMillis()))));
- rm1.apply();
- rm2.apply();
- rm3.apply();
+ rm1.build().apply();
+ rm2.build().apply();
+ rm3.build().apply();
if (forceFlush)
store.forceBlockingFlush();
@@ -1095,13 +1095,13 @@ public class SASIIndexTest
final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
final ByteBuffer age = UTF8Type.instance.decompose("age");
- Mutation rm = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key1")));
+ Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey(AsciiType.instance.decompose("key1")));
update(rm, new ArrayList<Cell>()
{{
add(buildCell(age, LongType.instance.decompose(26L), System.currentTimeMillis()));
add(buildCell(firstName, AsciiType.instance.decompose("pavel"), System.currentTimeMillis()));
}});
- rm.apply();
+ rm.build().apply();
store.forceBlockingFlush();
@@ -1127,25 +1127,25 @@ public class SASIIndexTest
final ByteBuffer comment = UTF8Type.instance.decompose("comment");
- Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
+ Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key1"));
update(rm, comment, UTF8Type.instance.decompose("ⓈⓅⒺⒸⒾⒶⓁ ⒞⒣⒜⒭⒮ and normal ones"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key2"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key2"));
update(rm, comment, UTF8Type.instance.decompose("龍馭鬱"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key3"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key3"));
update(rm, comment, UTF8Type.instance.decompose("インディアナ"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key4"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key4"));
update(rm, comment, UTF8Type.instance.decompose("レストラン"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key5"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key5"));
update(rm, comment, UTF8Type.instance.decompose("ベンジャミン ウエスト"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
if (forceFlush)
store.forceBlockingFlush();
@@ -1203,21 +1203,21 @@ public class SASIIndexTest
final ByteBuffer comment = UTF8Type.instance.decompose("comment_suffix_split");
- Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
+ Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key1"));
update(rm, comment, UTF8Type.instance.decompose("龍馭鬱"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key2"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key2"));
update(rm, comment, UTF8Type.instance.decompose("インディアナ"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key3"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key3"));
update(rm, comment, UTF8Type.instance.decompose("レストラン"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key4"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key4"));
update(rm, comment, UTF8Type.instance.decompose("ベンジャミン ウエスト"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
if (forceFlush)
store.forceBlockingFlush();
@@ -1272,9 +1272,9 @@ public class SASIIndexTest
final ByteBuffer bigValue = UTF8Type.instance.decompose(new String(randomBytes));
- Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
+ Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key1"));
update(rm, comment, bigValue, System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
Set<String> rows;
@@ -1352,37 +1352,37 @@ public class SASIIndexTest
final ByteBuffer fullName = UTF8Type.instance.decompose("/output/full-name/");
- Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
+ Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key1"));
update(rm, fullName, UTF8Type.instance.decompose("美加 八田"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key2"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key2"));
update(rm, fullName, UTF8Type.instance.decompose("仁美 瀧澤"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key3"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key3"));
update(rm, fullName, UTF8Type.instance.decompose("晃宏 高須"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key4"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key4"));
update(rm, fullName, UTF8Type.instance.decompose("弘孝 大竹"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key5"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key5"));
update(rm, fullName, UTF8Type.instance.decompose("満枝 榎本"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key6"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key6"));
update(rm, fullName, UTF8Type.instance.decompose("飛鳥 上原"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key7"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key7"));
update(rm, fullName, UTF8Type.instance.decompose("大輝 鎌田"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key8"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key8"));
update(rm, fullName, UTF8Type.instance.decompose("利久 寺地"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
store.forceBlockingFlush();
@@ -1408,17 +1408,17 @@ public class SASIIndexTest
final ByteBuffer comment = UTF8Type.instance.decompose("address");
- Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
+ Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key1"));
update(rm, comment, UTF8Type.instance.decompose("577 Rogahn Valleys Apt. 178"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key2"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key2"));
update(rm, comment, UTF8Type.instance.decompose("89809 Beverly Course Suite 089"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key3"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key3"));
update(rm, comment, UTF8Type.instance.decompose("165 clydie oval apt. 399"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
if (forceFlush)
store.forceBlockingFlush();
@@ -1483,42 +1483,42 @@ public class SASIIndexTest
final ByteBuffer name = UTF8Type.instance.decompose("first_name_prefix");
- Mutation rm;
+ Mutation.PartitionUpdateCollector rm;
- rm = new Mutation(KS_NAME, decoratedKey("key1"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key1"));
update(rm, name, UTF8Type.instance.decompose("Pavel"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key2"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key2"));
update(rm, name, UTF8Type.instance.decompose("Jordan"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key3"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key3"));
update(rm, name, UTF8Type.instance.decompose("Mikhail"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key4"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key4"));
update(rm, name, UTF8Type.instance.decompose("Michael"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key5"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key5"));
update(rm, name, UTF8Type.instance.decompose("Johnny"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
// first flush would make interval for name - 'johnny' -> 'pavel'
store.forceBlockingFlush();
- rm = new Mutation(KS_NAME, decoratedKey("key6"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key6"));
update(rm, name, UTF8Type.instance.decompose("Jason"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key7"));
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key7"));
update(rm, name, UTF8Type.instance.decompose("Vijay"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
- rm = new Mutation(KS_NAME, decoratedKey("key8")); // this name is going to be tokenized
+ rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey("key8")); // this name is going to be tokenized
update(rm, name, UTF8Type.instance.decompose("Jean-Claude"), System.currentTimeMillis());
- rm.apply();
+ rm.build().apply();
// this flush is going to produce range - 'jason' -> 'vijay'
store.forceBlockingFlush();
@@ -2422,7 +2422,7 @@ public class SASIIndexTest
private static Mutation newMutation(String key, String firstName, String lastName, int age, long timestamp)
{
- Mutation rm = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose(key)));
+ Mutation.PartitionUpdateCollector rm = new Mutation.PartitionUpdateCollector(KS_NAME, decoratedKey(AsciiType.instance.decompose(key)));
List<Cell> cells = new ArrayList<>(3);
if (age >= 0)
@@ -2433,7 +2433,7 @@ public class SASIIndexTest
cells.add(buildCell(ByteBufferUtil.bytes("last_name"), UTF8Type.instance.decompose(lastName), timestamp));
update(rm, cells);
- return rm;
+ return rm.build();
}
private static Set<String> getKeys(final UnfilteredPartitionIterator rows)
@@ -2526,14 +2526,14 @@ public class SASIIndexTest
return new Expression(name, op, value);
}
- private static void update(Mutation rm, ByteBuffer name, ByteBuffer value, long timestamp)
+ private static void update(Mutation.PartitionUpdateCollector rm, ByteBuffer name, ByteBuffer value, long timestamp)
{
TableMetadata metadata = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME).metadata();
rm.add(PartitionUpdate.singleRowUpdate(metadata, rm.key(), buildRow(buildCell(metadata, name, value, timestamp))));
}
- private static void update(Mutation rm, List<Cell> cells)
+ private static void update(Mutation.PartitionUpdateCollector rm, List<Cell> cells)
{
TableMetadata metadata = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME).metadata();
rm.add(PartitionUpdate.singleRowUpdate(metadata, rm.key(), buildRow(cells)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
index d3257d7..2d12baf 100644
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.net.MessagingService.Verb;
import org.apache.cassandra.schema.MockSchema;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -59,9 +60,10 @@ public class WriteCallbackInfoTest
private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean allowHints, boolean expectHint) throws Exception
{
+ TableMetadata metadata = MockSchema.newTableMetadata("", "");
Object payload = verb == Verb.PAXOS_COMMIT
- ? new Commit(UUID.randomUUID(), new PartitionUpdate(MockSchema.newTableMetadata("", ""), ByteBufferUtil.EMPTY_BYTE_BUFFER, RegularAndStaticColumns.NONE, 1))
- : new Mutation("", new BufferDecoratedKey(new Murmur3Partitioner.LongToken(0), ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ ? new Commit(UUID.randomUUID(), new PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, RegularAndStaticColumns.NONE, 1).build())
+ : new Mutation(PartitionUpdate.simpleBuilder(metadata, "").build());
WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddress.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints);
Assert.assertEquals(expectHint, wcbi.shouldHint());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
index aa5ee9b..50183e5 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
@@ -107,8 +107,8 @@ public class TriggerExecutorTest
TableMetadata metadata = makeTableMetadata("ks1", "cf1", TriggerMetadata.create("test", SameKeySameCfTrigger.class.getName()));
PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null);
PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null);
- Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1);
- Mutation rm2 = new Mutation("ks1", cf2.partitionKey()).add(cf2);
+ Mutation rm1 = new Mutation.PartitionUpdateCollector("ks1", cf1.partitionKey()).add(cf1).build();
+ Mutation rm2 = new Mutation.PartitionUpdateCollector("ks1", cf2.partitionKey()).add(cf2).build();
List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
assertEquals(2, tmutations.size());
@@ -133,8 +133,8 @@ public class TriggerExecutorTest
TableMetadata metadata = makeTableMetadata("ks1", "cf1", TriggerMetadata.create("test", SameKeySameCfPartialTrigger.class.getName()));
PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null);
PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null);
- Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1);
- Mutation rm2 = new Mutation("ks1", cf2.partitionKey()).add(cf2);
+ Mutation rm1 = new Mutation.PartitionUpdateCollector("ks1", cf1.partitionKey()).add(cf1).build();
+ Mutation rm2 = new Mutation.PartitionUpdateCollector("ks1", cf2.partitionKey()).add(cf2).build();
List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
assertEquals(2, tmutations.size());
@@ -159,8 +159,8 @@ public class TriggerExecutorTest
TableMetadata metadata = makeTableMetadata("ks1", "cf1", TriggerMetadata.create("test", SameKeyDifferentCfTrigger.class.getName()));
PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null);
PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null);
- Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1);
- Mutation rm2 = new Mutation("ks1", cf2.partitionKey()).add(cf2);
+ Mutation rm1 = new Mutation.PartitionUpdateCollector("ks1", cf1.partitionKey()).add(cf1).build();
+ Mutation rm2 = new Mutation.PartitionUpdateCollector("ks1", cf2.partitionKey()).add(cf2).build();
List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
assertEquals(2, tmutations.size());
@@ -210,8 +210,8 @@ public class TriggerExecutorTest
TableMetadata metadata = makeTableMetadata("ks1", "cf1", TriggerMetadata.create("test", SameKeyDifferentKsTrigger.class.getName()));
PartitionUpdate cf1 = makeCf(metadata, "k1", "k1v1", null);
PartitionUpdate cf2 = makeCf(metadata, "k2", "k2v1", null);
- Mutation rm1 = new Mutation("ks1", cf1.partitionKey()).add(cf1);
- Mutation rm2 = new Mutation("ks1", cf2.partitionKey()).add(cf2);
+ Mutation rm1 = new Mutation.PartitionUpdateCollector("ks1", cf1.partitionKey()).add(cf1).build();
+ Mutation rm2 = new Mutation.PartitionUpdateCollector("ks1", cf2.partitionKey()).add(cf2).build();
List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
assertEquals(4, tmutations.size());
@@ -248,7 +248,7 @@ public class TriggerExecutorTest
TableMetadata metadata = makeTableMetadata("ks1", "cf1", TriggerMetadata.create("test", DifferentKeyTrigger.class.getName()));
PartitionUpdate cf1 = makeCf(metadata, "k1", "v1", null);
- Mutation rm = new Mutation("ks1", cf1.partitionKey()).add(cf1);
+ Mutation rm = new Mutation.PartitionUpdateCollector("ks1", cf1.partitionKey()).add(cf1).build();
List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm)));
assertEquals(2, tmutations.size());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/2] cassandra git commit: Make PartitionUpdate and Mutation
immutable
Posted by ma...@apache.org.
Make PartitionUpdate and Mutation immutable
Patch by marcuse; reviewed by Aleksey Yeschenko for CASSANDRA-13867
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de7c24b3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de7c24b3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de7c24b3
Branch: refs/heads/trunk
Commit: de7c24b395265ff619c622ed6be8d88453f158ac
Parents: 45c7c45
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Sep 13 15:57:50 2017 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jan 9 08:10:23 2018 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/BatchStatement.java | 4 +-
.../cql3/statements/BatchUpdatesCollector.java | 231 ++++++++++
.../cql3/statements/CQL3CasRequest.java | 26 +-
.../cql3/statements/DeleteStatement.java | 18 +-
.../cql3/statements/ModificationStatement.java | 33 +-
.../statements/SingleTableUpdatesCollector.java | 101 +++++
.../cql3/statements/UpdateStatement.java | 12 +-
.../cql3/statements/UpdatesCollector.java | 127 +-----
.../apache/cassandra/db/CounterMutation.java | 7 +-
src/java/org/apache/cassandra/db/IMutation.java | 6 +
src/java/org/apache/cassandra/db/Mutation.java | 137 +++---
.../org/apache/cassandra/db/SimpleBuilders.java | 25 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 9 +-
.../db/commitlog/CommitLogReplayer.java | 20 +-
.../db/partitions/PartitionUpdate.java | 448 +++++++++----------
.../apache/cassandra/db/view/TableViews.java | 16 +-
.../cassandra/db/view/ViewUpdateGenerator.java | 20 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 4 +-
.../io/sstable/SSTableSimpleUnsortedWriter.java | 22 +-
.../io/sstable/SSTableSimpleWriter.java | 10 +-
.../apache/cassandra/schema/SchemaKeyspace.java | 14 +-
.../apache/cassandra/service/DataResolver.java | 8 +-
.../apache/cassandra/service/paxos/Commit.java | 4 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 4 +-
.../cassandra/db/CounterMutationTest.java | 8 +-
.../cassandra/db/DeletePartitionTest.java | 3 +-
test/unit/org/apache/cassandra/db/RowTest.java | 8 +-
.../apache/cassandra/db/RowUpdateBuilder.java | 28 +-
.../db/commitlog/CommitLogReaderTest.java | 4 +-
.../db/compaction/CompactionsPurgeTest.java | 11 +-
.../db/partition/PartitionUpdateTest.java | 15 +
.../rows/RowAndDeletionMergeIteratorTest.java | 6 +-
.../cassandra/index/sasi/SASIIndexTest.java | 142 +++---
.../cassandra/net/WriteCallbackInfoTest.java | 6 +-
.../cassandra/triggers/TriggerExecutorTest.java | 18 +-
36 files changed, 912 insertions(+), 644 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ba0f842..623bf8a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867)
* Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066)
* Fix cassandra-stress startup failure (CASSANDRA-14106)
* Remove initialDirectories from CFS (CASSANDRA-13928)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 7497f47..b54e3a0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -224,7 +224,7 @@ public class BatchStatement implements CQLStatement
throws RequestExecutionException, RequestValidationException
{
Set<String> tablesWithZeroGcGs = null;
- UpdatesCollector collector = new UpdatesCollector(updatedColumns, updatedRows());
+ BatchUpdatesCollector collector = new BatchUpdatesCollector(updatedColumns, updatedRows());
for (int i = 0; i < statements.size(); i++)
{
ModificationStatement statement = statements.get(i);
@@ -247,8 +247,6 @@ public class BatchStatement implements CQLStatement
ClientWarn.instance.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs })
.getMessage());
}
-
- collector.validateIndexedColumns();
return collector.toMutations();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
new file mode 100644
index 0000000..9671b02
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+
+/**
+ * Utility class to collect updates.
+ *
+ * <p>In a batch statement we don't want to recreate mutations every time as this is particularly inefficient when
+ * applying multiple batch to the same partition (see #6737). </p>
+ *
+ */
+final class BatchUpdatesCollector implements UpdatesCollector
+{
+ /**
+ * The columns that will be updated for each table (keyed by the table ID).
+ */
+ private final Map<TableId, RegularAndStaticColumns> updatedColumns;
+
+ /**
+ * The estimated number of updated row.
+ */
+ private final int updatedRows;
+
+ /**
+ * The mutations per keyspace.
+ */
+ private final Map<String, Map<ByteBuffer, IMutationBuilder>> mutationBuilders = new HashMap<>();
+
+ BatchUpdatesCollector(Map<TableId, RegularAndStaticColumns> updatedColumns, int updatedRows)
+ {
+ super();
+ this.updatedColumns = updatedColumns;
+ this.updatedRows = updatedRows;
+ }
+
+ /**
+ * Gets the <code>PartitionUpdate.Builder</code> for the specified column family and key. If the builder does not
+ * exist it will be created.
+ *
+ * @param metadata the column family meta data
+ * @param dk the partition key
+ * @param consistency the consistency level
+ * @return the <code>PartitionUpdate.Builder</code> for the specified column family and key
+ */
+ public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
+ {
+ IMutationBuilder mut = getMutationBuilder(metadata, dk, consistency);
+ PartitionUpdate.Builder upd = mut.get(metadata.id);
+ if (upd == null)
+ {
+ RegularAndStaticColumns columns = updatedColumns.get(metadata.id);
+ assert columns != null;
+ upd = new PartitionUpdate.Builder(metadata, dk, columns, updatedRows);
+ mut.add(upd);
+ }
+ return upd;
+ }
+
+ private IMutationBuilder getMutationBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
+ {
+ String ksName = metadata.keyspace;
+ IMutationBuilder mutationBuilder = keyspaceMap(ksName).get(dk.getKey());
+ if (mutationBuilder == null)
+ {
+ MutationBuilder builder = new MutationBuilder(ksName, dk);
+ mutationBuilder = metadata.isCounter() ? new CounterMutationBuilder(builder, consistency) : builder;
+ keyspaceMap(ksName).put(dk.getKey(), mutationBuilder);
+ }
+ return mutationBuilder;
+ }
+
+ /**
+ * Returns a collection containing all the mutations.
+ * @return a collection containing all the mutations.
+ */
+ public Collection<IMutation> toMutations()
+ {
+ //TODO: The case where all statement where on the same keyspace is pretty common, optimize for that?
+ List<IMutation> ms = new ArrayList<>();
+ for (Map<ByteBuffer, IMutationBuilder> ksMap : mutationBuilders.values())
+ {
+ for (IMutationBuilder builder : ksMap.values())
+ {
+ IMutation mutation = builder.build();
+ mutation.validateIndexedColumns();
+ ms.add(mutation);
+ }
+ }
+ return ms;
+ }
+
+ /**
+ * Returns the key-mutation mappings for the specified keyspace.
+ *
+ * @param ksName the keyspace name
+ * @return the key-mutation mappings for the specified keyspace.
+ */
+ private Map<ByteBuffer, IMutationBuilder> keyspaceMap(String ksName)
+ {
+ return mutationBuilders.computeIfAbsent(ksName, k -> new HashMap<>());
+ }
+
+ private interface IMutationBuilder
+ {
+ /**
+ * Add a new PartitionUpdate builder to this mutation builder
+ * @param builder the builder to add
+ * @return this
+ */
+ IMutationBuilder add(PartitionUpdate.Builder builder);
+
+ /**
+ * Build the immutable mutation
+ */
+ IMutation build();
+
+ /**
+ * Get the builder for the given tableId
+ */
+ PartitionUpdate.Builder get(TableId tableId);
+ }
+
+ private static class MutationBuilder implements IMutationBuilder
+ {
+ private final HashMap<TableId, PartitionUpdate.Builder> modifications = new HashMap<>();
+ private final DecoratedKey key;
+ private final String keyspaceName;
+ private final long createdAt = System.currentTimeMillis();
+
+ private MutationBuilder(String keyspaceName, DecoratedKey key)
+ {
+ this.keyspaceName = keyspaceName;
+ this.key = key;
+ }
+
+ public MutationBuilder add(PartitionUpdate.Builder updateBuilder)
+ {
+ assert updateBuilder != null;
+ assert updateBuilder.partitionKey().getPartitioner() == key.getPartitioner();
+ PartitionUpdate.Builder prev = modifications.put(updateBuilder.metadata().id, updateBuilder);
+ if (prev != null)
+ // developer error
+ throw new IllegalArgumentException("Table " + updateBuilder.metadata().name + " already has modifications in this mutation: " + prev);
+ return this;
+ }
+
+ public Mutation build()
+ {
+ ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>();
+ for (Map.Entry<TableId, PartitionUpdate.Builder> updateEntry : modifications.entrySet())
+ {
+ PartitionUpdate update = updateEntry.getValue().build();
+ updates.put(updateEntry.getKey(), update);
+ }
+ return new Mutation(keyspaceName, key, updates.build(), createdAt);
+ }
+
+ public PartitionUpdate.Builder get(TableId tableId)
+ {
+ return modifications.get(tableId);
+ }
+
+ public DecoratedKey key()
+ {
+ return key;
+ }
+
+ public boolean isEmpty()
+ {
+ return modifications.isEmpty();
+ }
+
+ public String getKeyspaceName()
+ {
+ return keyspaceName;
+ }
+ }
+
+ private static class CounterMutationBuilder implements IMutationBuilder
+ {
+ private final MutationBuilder mutationBuilder;
+ private final ConsistencyLevel cl;
+
+ private CounterMutationBuilder(MutationBuilder mutationBuilder, ConsistencyLevel cl)
+ {
+ this.mutationBuilder = mutationBuilder;
+ this.cl = cl;
+ }
+
+ public IMutationBuilder add(PartitionUpdate.Builder builder)
+ {
+ return mutationBuilder.add(builder);
+ }
+
+ public IMutation build()
+ {
+ return new CounterMutation(mutationBuilder.build(), cl);
+ }
+
+ public PartitionUpdate.Builder get(TableId id)
+ {
+ return mutationBuilder.get(id);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index ec11df3..8619945 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -235,14 +235,16 @@ public class CQL3CasRequest implements CASRequest
public PartitionUpdate makeUpdates(FilteredPartition current) throws InvalidRequestException
{
- PartitionUpdate update = new PartitionUpdate(metadata, key, updatedColumns(), conditions.size());
+ PartitionUpdate.Builder updateBuilder = new PartitionUpdate.Builder(metadata, key, updatedColumns(), conditions.size());
for (RowUpdate upd : updates)
- upd.applyUpdates(current, update);
+ upd.applyUpdates(current, updateBuilder);
for (RangeDeletion upd : rangeDeletions)
- upd.applyUpdates(current, update);
+ upd.applyUpdates(current, updateBuilder);
- Keyspace.openAndGetStore(metadata).indexManager.validate(update);
- return update;
+ PartitionUpdate partitionUpdate = updateBuilder.build();
+ Keyspace.openAndGetStore(metadata).indexManager.validate(partitionUpdate);
+
+ return partitionUpdate;
}
/**
@@ -266,11 +268,11 @@ public class CQL3CasRequest implements CASRequest
this.timestamp = timestamp;
}
- public void applyUpdates(FilteredPartition current, PartitionUpdate updates) throws InvalidRequestException
+ public void applyUpdates(FilteredPartition current, PartitionUpdate.Builder updateBuilder) throws InvalidRequestException
{
- Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.<DecoratedKey, Partition>singletonMap(key, current) : null;
- UpdateParameters params = new UpdateParameters(metadata, updates.columns(), options, timestamp, stmt.getTimeToLive(options), map);
- stmt.addUpdateForKey(updates, clustering, params);
+ Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.singletonMap(key, current) : null;
+ UpdateParameters params = new UpdateParameters(metadata, updateBuilder.columns(), options, timestamp, stmt.getTimeToLive(options), map);
+ stmt.addUpdateForKey(updateBuilder, clustering, params);
}
}
@@ -289,12 +291,12 @@ public class CQL3CasRequest implements CASRequest
this.timestamp = timestamp;
}
- public void applyUpdates(FilteredPartition current, PartitionUpdate updates) throws InvalidRequestException
+ public void applyUpdates(FilteredPartition current, PartitionUpdate.Builder updateBuilder) throws InvalidRequestException
{
// No slice statements currently require a read, but this maintains consistency with RowUpdate, and future proofs us
Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.<DecoratedKey, Partition>singletonMap(key, current) : null;
- UpdateParameters params = new UpdateParameters(metadata, updates.columns(), options, timestamp, stmt.getTimeToLive(options), map);
- stmt.addUpdateForKey(updates, slice, params);
+ UpdateParameters params = new UpdateParameters(metadata, updateBuilder.columns(), options, timestamp, stmt.getTimeToLive(options), map);
+ stmt.addUpdateForKey(updateBuilder, slice, params);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index e880bf8..8f3349e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -50,7 +50,7 @@ public class DeleteStatement extends ModificationStatement
}
@Override
- public void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params)
+ public void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Clustering clustering, UpdateParameters params)
throws InvalidRequestException
{
TableMetadata metadata = metadata();
@@ -63,19 +63,19 @@ public class DeleteStatement extends ModificationStatement
// We're not deleting any specific columns so it's either a full partition deletion ....
if (clustering.size() == 0)
{
- update.addPartitionDeletion(params.deletionTime());
+ updateBuilder.addPartitionDeletion(params.deletionTime());
}
// ... or a row deletion ...
else if (clustering.size() == metadata.clusteringColumns().size())
{
params.newRow(clustering);
params.addRowDeletion();
- update.add(params.buildRow());
+ updateBuilder.add(params.buildRow());
}
// ... or a range of rows deletion.
else
{
- update.add(params.makeRangeTombstone(metadata.comparator, clustering));
+ updateBuilder.add(params.makeRangeTombstone(metadata.comparator, clustering));
}
}
else
@@ -91,22 +91,22 @@ public class DeleteStatement extends ModificationStatement
params.newRow(clustering);
for (Operation op : regularDeletions)
- op.execute(update.partitionKey(), params);
- update.add(params.buildRow());
+ op.execute(updateBuilder.partitionKey(), params);
+ updateBuilder.add(params.buildRow());
}
if (!staticDeletions.isEmpty())
{
params.newRow(Clustering.STATIC_CLUSTERING);
for (Operation op : staticDeletions)
- op.execute(update.partitionKey(), params);
- update.add(params.buildRow());
+ op.execute(updateBuilder.partitionKey(), params);
+ updateBuilder.add(params.buildRow());
}
}
}
@Override
- public void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params)
+ public void addUpdateForKey(PartitionUpdate.Builder update, Slice slice, UpdateParameters params)
{
List<Operation> regularDeletions = getRegularOperations();
List<Operation> staticDeletions = getStaticOperations();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index decb99f..f0cfd0d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -174,9 +174,9 @@ public abstract class ModificationStatement implements CQLStatement
return restrictions;
}
- public abstract void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params);
+ public abstract void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Clustering clustering, UpdateParameters params);
- public abstract void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params);
+ public abstract void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Slice slice, UpdateParameters params);
public int getBoundTerms()
{
@@ -643,10 +643,8 @@ public abstract class ModificationStatement implements CQLStatement
*/
private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now, long queryStartNanoTime)
{
- UpdatesCollector collector = new UpdatesCollector(Collections.singletonMap(metadata.id, updatedColumns), 1);
+ UpdatesCollector collector = new SingleTableUpdatesCollector(metadata, updatedColumns, 1);
addUpdates(collector, options, local, now, queryStartNanoTime);
- collector.validateIndexedColumns();
-
return collector.toMutations();
}
@@ -678,10 +676,10 @@ public abstract class ModificationStatement implements CQLStatement
Validation.validateKey(metadata(), key);
DecoratedKey dk = metadata().partitioner.decorateKey(key);
- PartitionUpdate upd = collector.getPartitionUpdate(metadata(), dk, options.getConsistency());
+ PartitionUpdate.Builder updateBuilder = collector.getPartitionUpdateBuilder(metadata(), dk, options.getConsistency());
for (Slice slice : slices)
- addUpdateForKey(upd, slice, params);
+ addUpdateForKey(updateBuilder, slice, params);
}
}
else
@@ -699,25 +697,24 @@ public abstract class ModificationStatement implements CQLStatement
Validation.validateKey(metadata(), key);
DecoratedKey dk = metadata().partitioner.decorateKey(key);
- PartitionUpdate upd = collector.getPartitionUpdate(metadata, dk, options.getConsistency());
+ PartitionUpdate.Builder updateBuilder = collector.getPartitionUpdateBuilder(metadata(), dk, options.getConsistency());
if (!restrictions.hasClusteringColumnsRestrictions())
{
- addUpdateForKey(upd, Clustering.EMPTY, params);
+ addUpdateForKey(updateBuilder, Clustering.EMPTY, params);
}
else
{
for (Clustering clustering : clusterings)
{
- for (ByteBuffer c : clustering.getRawValues())
- {
- if (c != null && c.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
- throw new InvalidRequestException(String.format("Key length of %d is longer than maximum of %d",
- clustering.dataSize(),
- FBUtilities.MAX_UNSIGNED_SHORT));
- }
-
- addUpdateForKey(upd, clustering, params);
+ for (ByteBuffer c : clustering.getRawValues())
+ {
+ if (c != null && c.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+ throw new InvalidRequestException(String.format("Key length of %d is longer than maximum of %d",
+ clustering.dataSize(),
+ FBUtilities.MAX_UNSIGNED_SHORT));
+ }
+ addUpdateForKey(updateBuilder, clustering, params);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
new file mode 100644
index 0000000..9eaf897
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * Utility class to collect updates.
+ */
+final class SingleTableUpdatesCollector implements UpdatesCollector
+{
+ /**
+ * the table to be updated
+ */
+ private final TableMetadata metadata;
+
+ /**
+ * the columns to update
+ */
+ private final RegularAndStaticColumns updatedColumns;
+
+ /**
+ * The estimated number of updated row.
+ */
+ private final int updatedRows;
+
+ /**
+ * the partition update builders per key
+ */
+ private final Map<ByteBuffer, PartitionUpdate.Builder> puBuilders = new HashMap<>();
+
+ /**
+ * if it is a counter table, we will set this
+ */
+ private ConsistencyLevel counterConsistencyLevel = null;
+
+ SingleTableUpdatesCollector(TableMetadata metadata, RegularAndStaticColumns updatedColumns, int updatedRows)
+ {
+ this.metadata = metadata;
+ this.updatedColumns = updatedColumns;
+ this.updatedRows = updatedRows;
+ }
+
+ public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
+ {
+ if (metadata.isCounter())
+ counterConsistencyLevel = consistency;
+ return puBuilders.computeIfAbsent(dk.getKey(), (k) -> new PartitionUpdate.Builder(metadata, dk, updatedColumns, updatedRows));
+ }
+
+ /**
+ * Returns a collection containing all the mutations.
+ * @return a collection containing all the mutations.
+ */
+ public Collection<IMutation> toMutations()
+ {
+ List<IMutation> ms = new ArrayList<>();
+ for (PartitionUpdate.Builder builder : puBuilders.values())
+ {
+ IMutation mutation = null;
+
+ if (metadata.isCounter())
+ mutation = new CounterMutation(new Mutation(builder.build()), counterConsistencyLevel);
+ else
+ mutation = new Mutation(builder.build());
+ mutation.validateIndexedColumns();
+ ms.add(mutation);
+ }
+
+ return ms;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 7a2a1ba..a373ba1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -63,7 +63,7 @@ public class UpdateStatement extends ModificationStatement
}
@Override
- public void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params)
+ public void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Clustering clustering, UpdateParameters params)
{
if (updatesRegularRows())
{
@@ -90,22 +90,22 @@ public class UpdateStatement extends ModificationStatement
}
for (Operation op : updates)
- op.execute(update.partitionKey(), params);
+ op.execute(updateBuilder.partitionKey(), params);
- update.add(params.buildRow());
+ updateBuilder.add(params.buildRow());
}
if (updatesStaticRow())
{
params.newRow(Clustering.STATIC_CLUSTERING);
for (Operation op : getStaticOperations())
- op.execute(update.partitionKey(), params);
- update.add(params.buildRow());
+ op.execute(updateBuilder.partitionKey(), params);
+ updateBuilder.add(params.buildRow());
}
}
@Override
- public void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params)
+ public void addUpdateForKey(PartitionUpdate.Builder update, Slice slice, UpdateParameters params)
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
index f16c619..30db7ca 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
@@ -15,127 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.cassandra.cql3.statements;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collection;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.TableMetadata;
-/**
- * Utility class to collect updates.
- *
- * <p>In a batch statement we don't want to recreate mutations every time as this is particularly inefficient when
- * applying multiple batch to the same partition (see #6737). </p>
- *
- */
-final class UpdatesCollector
+public interface UpdatesCollector
{
- /**
- * The columns that will be updated for each table (keyed by the table ID).
- */
- private final Map<TableId, RegularAndStaticColumns> updatedColumns;
-
- /**
- * The estimated number of updated row.
- */
- private final int updatedRows;
-
- /**
- * The mutations per keyspace.
- */
- private final Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
-
- public UpdatesCollector(Map<TableId, RegularAndStaticColumns> updatedColumns, int updatedRows)
- {
- super();
- this.updatedColumns = updatedColumns;
- this.updatedRows = updatedRows;
- }
-
- /**
- * Gets the <code>PartitionUpdate</code> for the specified column family and key. If the update does not
- * exist it will be created.
- *
- * @param metadata the column family meta data
- * @param dk the partition key
- * @param consistency the consistency level
- * @return the <code>PartitionUpdate</code> for the specified column family and key
- */
- public PartitionUpdate getPartitionUpdate(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
- {
- Mutation mut = getMutation(metadata, dk, consistency);
- PartitionUpdate upd = mut.get(metadata);
- if (upd == null)
- {
- RegularAndStaticColumns columns = updatedColumns.get(metadata.id);
- assert columns != null;
- upd = new PartitionUpdate(metadata, dk, columns, updatedRows);
- mut.add(upd);
- }
- return upd;
- }
-
- /**
- * Check all partition updates contain only valid values for any
- * indexed columns.
- */
- public void validateIndexedColumns()
- {
- for (Map<ByteBuffer, IMutation> perKsMutations : mutations.values())
- for (IMutation mutation : perKsMutations.values())
- for (PartitionUpdate update : mutation.getPartitionUpdates())
- Keyspace.openAndGetStore(update.metadata()).indexManager.validate(update);
- }
-
- private Mutation getMutation(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency)
- {
- String ksName = metadata.keyspace;
- IMutation mutation = keyspaceMap(ksName).get(dk.getKey());
- if (mutation == null)
- {
- Mutation mut = new Mutation(ksName, dk);
- mutation = metadata.isCounter() ? new CounterMutation(mut, consistency) : mut;
- keyspaceMap(ksName).put(dk.getKey(), mutation);
- return mut;
- }
- return metadata.isCounter() ? ((CounterMutation) mutation).getMutation() : (Mutation) mutation;
- }
-
- /**
- * Returns a collection containing all the mutations.
- * @return a collection containing all the mutations.
- */
- public Collection<IMutation> toMutations()
- {
- // The case where all statement where on the same keyspace is pretty common
- if (mutations.size() == 1)
- return mutations.values().iterator().next().values();
-
- List<IMutation> ms = new ArrayList<>();
- for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
- ms.addAll(ksMap.values());
-
- return ms;
- }
-
- /**
- * Returns the key-mutation mappings for the specified keyspace.
- *
- * @param ksName the keyspace name
- * @return the key-mutation mappings for the specified keyspace.
- */
- private Map<ByteBuffer, IMutation> keyspaceMap(String ksName)
- {
- Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName);
- if (ksMap == null)
- {
- ksMap = new HashMap<>();
- mutations.put(ksName, ksMap);
- }
- return ksMap;
- }
+ PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency);
+ Collection<IMutation> toMutations();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 0f1ad06..d04ddd8 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.Lock;
import com.google.common.base.Function;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
@@ -112,7 +113,7 @@ public class CounterMutation implements IMutation
*/
public Mutation applyCounterMutation() throws WriteTimeoutException
{
- Mutation result = new Mutation(getKeyspaceName(), key());
+ Mutation.PartitionUpdateCollector resultBuilder = new Mutation.PartitionUpdateCollector(getKeyspaceName(), key());
Keyspace keyspace = Keyspace.open(getKeyspaceName());
List<Lock> locks = new ArrayList<>();
@@ -121,7 +122,9 @@ public class CounterMutation implements IMutation
{
grabCounterLocks(keyspace, locks);
for (PartitionUpdate upd : getPartitionUpdates())
- result.add(processModifications(upd));
+ resultBuilder.add(processModifications(upd));
+
+ Mutation result = resultBuilder.build();
result.apply();
return result;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index 3d4b1b2..9eaf19b 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -32,6 +32,12 @@ public interface IMutation
public String toString(boolean shallow);
public Collection<PartitionUpdate> getPartitionUpdates();
+ public default void validateIndexedColumns()
+ {
+ for (PartitionUpdate pu : getPartitionUpdates())
+ pu.validateIndexedColumns();
+ }
+
/**
* Computes the total data size of the specified mutations.
* @param mutations the mutations
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 062e1fe..a6a920c 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -22,6 +22,8 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -37,8 +39,6 @@ import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
-// TODO convert this to a Builder pattern instead of encouraging M.add directly,
-// which is less-efficient since we have to keep a mutable HashMap around
public class Mutation implements IMutation
{
public static final MutationSerializer serializer = new MutationSerializer();
@@ -52,37 +52,31 @@ public class Mutation implements IMutation
private final DecoratedKey key;
// map of column family id to mutations for that column family.
- private final Map<TableId, PartitionUpdate> modifications;
+ private final ImmutableMap<TableId, PartitionUpdate> modifications;
- // Time at which this mutation was instantiated
- public final long createdAt = System.currentTimeMillis();
+ // Time at which this mutation or the builder that built it was instantiated
+ final long createdAt;
// keep track of when mutation has started waiting for a MV partition lock
- public final AtomicLong viewLockAcquireStart = new AtomicLong(0);
+ final AtomicLong viewLockAcquireStart = new AtomicLong(0);
- private boolean cdcEnabled = false;
-
- public Mutation(String keyspaceName, DecoratedKey key)
- {
- this(keyspaceName, key, new HashMap<>());
- }
+ private final boolean cdcEnabled;
public Mutation(PartitionUpdate update)
{
- this(update.metadata().keyspace, update.partitionKey(), Collections.singletonMap(update.metadata().id, update));
+ this(update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update), System.currentTimeMillis());
}
- protected Mutation(String keyspaceName, DecoratedKey key, Map<TableId, PartitionUpdate> modifications)
+ public Mutation(String keyspaceName, DecoratedKey key, ImmutableMap<TableId, PartitionUpdate> modifications, long createdAt)
{
this.keyspaceName = keyspaceName;
this.key = key;
this.modifications = modifications;
- for (PartitionUpdate pu : modifications.values())
- cdcEnabled |= pu.metadata().params.cdc;
- }
- public Mutation copy()
- {
- return new Mutation(keyspaceName, key, new HashMap<>(modifications));
+ boolean cdc = false;
+ for (PartitionUpdate pu : modifications.values())
+ cdc |= pu.metadata().params.cdc;
+ this.cdcEnabled = cdc;
+ this.createdAt = createdAt;
}
public Mutation without(Set<TableId> tableIds)
@@ -90,15 +84,16 @@ public class Mutation implements IMutation
if (tableIds.isEmpty())
return this;
- Mutation copy = copy();
-
- copy.modifications.keySet().removeAll(tableIds);
-
- copy.cdcEnabled = false;
- for (PartitionUpdate pu : modifications.values())
- copy.cdcEnabled |= pu.metadata().params.cdc;
+ ImmutableMap.Builder<TableId, PartitionUpdate> builder = new ImmutableMap.Builder<>();
+ for (Map.Entry<TableId, PartitionUpdate> update : modifications.entrySet())
+ {
+ if (!tableIds.contains(update.getKey()))
+ {
+ builder.put(update);
+ }
+ }
- return copy;
+ return new Mutation(keyspaceName, key, builder.build(), createdAt);
}
public Mutation without(TableId tableId)
@@ -121,7 +116,7 @@ public class Mutation implements IMutation
return key;
}
- public Collection<PartitionUpdate> getPartitionUpdates()
+ public ImmutableCollection<PartitionUpdate> getPartitionUpdates()
{
return modifications.values();
}
@@ -131,33 +126,6 @@ public class Mutation implements IMutation
return table == null ? null : modifications.get(table.id);
}
- /**
- * Adds PartitionUpdate to the local set of modifications.
- * Assumes no updates for the Table this PartitionUpdate impacts.
- *
- * @param update PartitionUpdate to append to Modifications list
- * @return Mutation this mutation
- * @throws IllegalArgumentException If PartitionUpdate for duplicate table is passed as argument
- */
- public Mutation add(PartitionUpdate update)
- {
- assert update != null;
- assert update.partitionKey().getPartitioner() == key.getPartitioner();
-
- cdcEnabled |= update.metadata().params.cdc;
-
- PartitionUpdate prev = modifications.put(update.metadata().id, update);
- if (prev != null)
- // developer error
- throw new IllegalArgumentException("Table " + update.metadata().name + " already has modifications in this mutation: " + prev);
- return this;
- }
-
- public PartitionUpdate get(TableMetadata metadata)
- {
- return modifications.get(metadata.id);
- }
-
public boolean isEmpty()
{
return modifications.isEmpty();
@@ -196,7 +164,7 @@ public class Mutation implements IMutation
}
List<PartitionUpdate> updates = new ArrayList<>(mutations.size());
- Map<TableId, PartitionUpdate> modifications = new HashMap<>(updatedTables.size());
+ ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>();
for (TableId table : updatedTables)
{
for (Mutation mutation : mutations)
@@ -212,7 +180,7 @@ public class Mutation implements IMutation
modifications.put(table, updates.size() == 1 ? updates.get(0) : PartitionUpdate.merge(updates));
updates.clear();
}
- return new Mutation(ks, key, modifications);
+ return new Mutation(ks, key, modifications.build(), System.currentTimeMillis());
}
public CompletableFuture<?> applyFuture()
@@ -389,7 +357,7 @@ public class Mutation implements IMutation
if (size == 1)
return new Mutation(update);
- Map<TableId, PartitionUpdate> modifications = new HashMap<>(size);
+ ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>();
DecoratedKey dk = update.partitionKey();
modifications.put(update.metadata().id, update);
@@ -398,8 +366,7 @@ public class Mutation implements IMutation
update = PartitionUpdate.serializer.deserialize(in, version, flag);
modifications.put(update.metadata().id, update);
}
-
- return new Mutation(update.metadata().keyspace, dk, modifications);
+ return new Mutation(update.metadata().keyspace, dk, modifications.build(), System.currentTimeMillis());
}
public Mutation deserialize(DataInputPlus in, int version) throws IOException
@@ -416,4 +383,52 @@ public class Mutation implements IMutation
return size;
}
}
+
+ /**
+ * Collects finalized partition updates
+ */
+ public static class PartitionUpdateCollector
+ {
+ private final ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new ImmutableMap.Builder<>();
+ private final String keyspaceName;
+ private final DecoratedKey key;
+ private final long createdAt = System.currentTimeMillis();
+ private boolean empty = true;
+
+ public PartitionUpdateCollector(String keyspaceName, DecoratedKey key)
+ {
+ this.keyspaceName = keyspaceName;
+ this.key = key;
+ }
+
+ public PartitionUpdateCollector add(PartitionUpdate partitionUpdate)
+ {
+ assert partitionUpdate != null;
+ assert partitionUpdate.partitionKey().getPartitioner() == key.getPartitioner();
+ // note that ImmutableMap.Builder only allows put:ing the same key once, it will fail during build() below otherwise
+ modifications.put(partitionUpdate.metadata().id, partitionUpdate);
+ empty = false;
+ return this;
+ }
+
+ public DecoratedKey key()
+ {
+ return key;
+ }
+
+ public String getKeyspaceName()
+ {
+ return keyspaceName;
+ }
+
+ public boolean isEmpty()
+ {
+ return empty;
+ }
+
+ public Mutation build()
+ {
+ return new Mutation(keyspaceName, key, modifications.build(), createdAt);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/SimpleBuilders.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SimpleBuilders.java b/src/java/org/apache/cassandra/db/SimpleBuilders.java
index a212834..3520d97 100644
--- a/src/java/org/apache/cassandra/db/SimpleBuilders.java
+++ b/src/java/org/apache/cassandra/db/SimpleBuilders.java
@@ -146,10 +146,10 @@ public abstract class SimpleBuilders
if (updateBuilders.size() == 1)
return new Mutation(updateBuilders.values().iterator().next().build());
- Mutation mutation = new Mutation(keyspaceName, key);
+ Mutation.PartitionUpdateCollector mutationBuilder = new Mutation.PartitionUpdateCollector(keyspaceName, key);
for (PartitionUpdateBuilder builder : updateBuilders.values())
- mutation.add(builder.build());
- return mutation;
+ mutationBuilder.add(builder.build());
+ return mutationBuilder.build();
}
}
@@ -159,6 +159,7 @@ public abstract class SimpleBuilders
private final DecoratedKey key;
private final Map<Clustering, RowBuilder> rowBuilders = new HashMap<>();
private List<RTBuilder> rangeBuilders = null; // We use that rarely, so create lazily
+ private List<RangeTombstone> rangeTombstones = null;
private DeletionTime partitionDeletion = DeletionTime.LIVE;
@@ -204,6 +205,14 @@ public abstract class SimpleBuilders
return builder;
}
+ public PartitionUpdate.SimpleBuilder addRangeTombstone(RangeTombstone rt)
+ {
+ if (rangeTombstones == null)
+ rangeTombstones = new ArrayList<>();
+ rangeTombstones.add(rt);
+ return this;
+ }
+
public PartitionUpdate build()
{
// Collect all updated columns
@@ -213,7 +222,7 @@ public abstract class SimpleBuilders
// Note that rowBuilders.size() could include the static column so could be 1 off the really need capacity
// of the final PartitionUpdate, but as that's just a sizing hint, we'll live.
- PartitionUpdate update = new PartitionUpdate(metadata, key, columns.build(), rowBuilders.size());
+ PartitionUpdate.Builder update = new PartitionUpdate.Builder(metadata, key, columns.build(), rowBuilders.size());
update.addPartitionDeletion(partitionDeletion);
if (rangeBuilders != null)
@@ -222,10 +231,16 @@ public abstract class SimpleBuilders
update.add(builder.build());
}
+ if (rangeTombstones != null)
+ {
+ for (RangeTombstone rt : rangeTombstones)
+ update.add(rt);
+ }
+
for (RowBuilder builder : rowBuilders.values())
update.add(builder.build());
- return update;
+ return update.build();
}
public Mutation buildAsMutation()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 9da0f6b..4469384 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1025,7 +1025,7 @@ public final class SystemKeyspace
UntypedResultSet.Row row = results.one();
Commit promised = row.has("in_progress_ballot")
- ? new Commit(row.getUUID("in_progress_ballot"), new PartitionUpdate(metadata, key, metadata.regularAndStaticColumns(), 1))
+ ? new Commit(row.getUUID("in_progress_ballot"), new PartitionUpdate.Builder(metadata, key, metadata.regularAndStaticColumns(), 1).build())
: Commit.emptyCommit(key, metadata);
// either we have both a recently accepted ballot and update or we have neither
Commit accepted = row.has("proposal_version") && row.has("proposal")
@@ -1135,9 +1135,7 @@ public final class SystemKeyspace
public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates)
{
long timestamp = FBUtilities.timestampMicros();
- PartitionUpdate update = new PartitionUpdate(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.regularAndStaticColumns(), estimates.size());
- Mutation mutation = new Mutation(update);
-
+ PartitionUpdate.Builder update = new PartitionUpdate.Builder(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.regularAndStaticColumns(), estimates.size());
// delete all previous values with a single range tombstone.
int nowInSec = FBUtilities.nowInSeconds();
update.add(new RangeTombstone(Slice.make(SizeEstimates.comparator, table), new DeletionTime(timestamp - 1, nowInSec)));
@@ -1153,8 +1151,7 @@ public final class SystemKeyspace
.add("mean_partition_size", values.right)
.build());
}
-
- mutation.apply();
+ new Mutation(update.build()).apply();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index ce185b6..2947222 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -26,7 +26,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
-import com.google.common.collect.*;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Ordering;
+
import org.apache.commons.lang3.StringUtils;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
@@ -249,7 +253,7 @@ public class CommitLogReplayer implements CommitLogReadHandler
// b) have already been flushed,
// or c) are part of a cf that was dropped.
// Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
- Mutation newMutation = null;
+ Mutation.PartitionUpdateCollector newPUCollector = null;
for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation))
{
if (Schema.instance.getTableMetadata(update.metadata().id) == null)
@@ -259,17 +263,17 @@ public class CommitLogReplayer implements CommitLogReadHandler
// if it is the last known segment, if we are after the commit log segment position
if (commitLogReplayer.shouldReplay(update.metadata().id, new CommitLogPosition(segmentId, entryLocation)))
{
- if (newMutation == null)
- newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
- newMutation.add(update);
+ if (newPUCollector == null)
+ newPUCollector = new Mutation.PartitionUpdateCollector(mutation.getKeyspaceName(), mutation.key());
+ newPUCollector.add(update);
commitLogReplayer.replayedCount.incrementAndGet();
}
}
- if (newMutation != null)
+ if (newPUCollector != null)
{
- assert !newMutation.isEmpty();
+ assert !newPUCollector.isEmpty();
- Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false);
+ Keyspace.open(newPUCollector.getKeyspaceName()).apply(newPUCollector.build(), false, true, false);
commitLogReplayer.keyspacesReplayed.add(keyspace);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 7a0cefe..a549458 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.partitions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -60,37 +61,11 @@ public class PartitionUpdate extends AbstractBTreePartition
public static final PartitionUpdateSerializer serializer = new PartitionUpdateSerializer();
- private final int createdAtInSec = FBUtilities.nowInSeconds();
-
- // Records whether this update is "built", i.e. if the build() method has been called, which
- // happens when the update is read. Further writing is then rejected though a manual call
- // to allowNewUpdates() allow new writes. We could make that more implicit but only triggers
- // really requires that so we keep it simple for now).
- private volatile boolean isBuilt;
- private boolean canReOpen = true;
-
- private Holder holder;
- private BTree.Builder<Row> rowBuilder;
- private MutableDeletionInfo deletionInfo;
-
- private final boolean canHaveShadowedData;
-
+ private final Holder holder;
+ private final DeletionInfo deletionInfo;
private final TableMetadata metadata;
- private PartitionUpdate(TableMetadata metadata,
- DecoratedKey key,
- RegularAndStaticColumns columns,
- MutableDeletionInfo deletionInfo,
- int initialRowCapacity,
- boolean canHaveShadowedData)
- {
- super(key);
- this.metadata = metadata;
- this.deletionInfo = deletionInfo;
- this.holder = new Holder(columns, BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
- this.canHaveShadowedData = canHaveShadowedData;
- rowBuilder = builder(initialRowCapacity);
- }
+ private final boolean canHaveShadowedData;
private PartitionUpdate(TableMetadata metadata,
DecoratedKey key,
@@ -102,29 +77,9 @@ public class PartitionUpdate extends AbstractBTreePartition
this.metadata = metadata;
this.holder = holder;
this.deletionInfo = deletionInfo;
- this.isBuilt = true;
this.canHaveShadowedData = canHaveShadowedData;
}
- public PartitionUpdate(TableMetadata metadata,
- DecoratedKey key,
- RegularAndStaticColumns columns,
- int initialRowCapacity)
- {
- this(metadata, key, columns, MutableDeletionInfo.live(), initialRowCapacity, true);
- }
-
- public PartitionUpdate(TableMetadata metadata,
- ByteBuffer key,
- RegularAndStaticColumns columns,
- int initialRowCapacity)
- {
- this(metadata,
- metadata.partitioner.decorateKey(key),
- columns,
- initialRowCapacity);
- }
-
/**
* Creates a empty immutable partition update.
*
@@ -329,30 +284,6 @@ public class PartitionUpdate extends AbstractBTreePartition
}
/**
- * Modify this update to set every timestamp for live data to {@code newTimestamp} and
- * every deletion timestamp to {@code newTimestamp - 1}.
- *
- * There is no reason to use that expect on the Paxos code path, where we need ensure that
- * anything inserted use the ballot timestamp (to respect the order of update decided by
- * the Paxos algorithm). We use {@code newTimestamp - 1} for deletions because tombstones
- * always win on timestamp equality and we don't want to delete our own insertions
- * (typically, when we overwrite a collection, we first set a complex deletion to delete the
- * previous collection before adding new elements. If we were to set that complex deletion
- * to the same timestamp that the new elements, it would delete those elements). And since
- * tombstones always wins on timestamp equality, using -1 guarantees our deletion will still
- * delete anything from a previous update.
- */
- public void updateAllTimestamp(long newTimestamp)
- {
- Holder holder = holder();
- deletionInfo.updateAllTimestamp(newTimestamp - 1);
- Object[] tree = BTree.<Row>transformAndFilter(holder.tree, (x) -> x.updateAllTimestamp(newTimestamp));
- Row staticRow = holder.staticRow.updateAllTimestamp(newTimestamp);
- EncodingStats newStats = EncodingStats.Collector.collect(staticRow, BTree.<Row>iterator(tree), deletionInfo);
- this.holder = new Holder(holder.columns, tree, deletionInfo, staticRow, newStats);
- }
-
- /**
* The number of "operations" contained in the update.
* <p>
* This is used by {@code Memtable} to approximate how much work this update does. In practice, this
@@ -401,7 +332,6 @@ public class PartitionUpdate extends AbstractBTreePartition
protected Holder holder()
{
- maybeBuild();
return holder;
}
@@ -411,50 +341,6 @@ public class PartitionUpdate extends AbstractBTreePartition
}
/**
- * If a partition update has been read (and is thus unmodifiable), a call to this method
- * makes the update modifiable again.
- * <p>
- * Please note that calling this method won't result in optimal behavior in the sense that
- * even if very little is added to the update after this call, the whole update will be sorted
- * again on read. This should thus be used sparingly (and if it turns that we end up using
- * this often, we should consider optimizing the behavior).
- */
- public synchronized void allowNewUpdates()
- {
- if (!canReOpen)
- throw new IllegalStateException("You cannot do more updates on collectCounterMarks has been called");
-
- // This is synchronized to make extra sure things work properly even if this is
- // called concurrently with sort() (which should be avoided in the first place, but
- // better safe than sorry).
- isBuilt = false;
- if (rowBuilder == null)
- rowBuilder = builder(16);
- }
-
- private BTree.Builder<Row> builder(int initialCapacity)
- {
- return BTree.<Row>builder(metadata().comparator, initialCapacity)
- .setQuickResolver((a, b) ->
- Rows.merge(a, b, createdAtInSec));
- }
-
- /**
- * Returns an iterator that iterates over the rows of this update in clustering order.
- * <p>
- * Note that this might trigger a sorting of the update, and as such the update will not
- * be modifiable anymore after this call.
- *
- * @return an iterator over the rows of this update.
- */
- @Override
- public Iterator<Row> iterator()
- {
- maybeBuild();
- return super.iterator();
- }
-
- /**
* Validates the data contained in this update.
*
* @throws org.apache.cassandra.serializers.MarshalException if some of the data contained in this update is corrupted.
@@ -476,8 +362,6 @@ public class PartitionUpdate extends AbstractBTreePartition
*/
public long maxTimestamp()
{
- maybeBuild();
-
long maxTimestamp = deletionInfo.maxTimestamp();
for (Row row : this)
{
@@ -509,11 +393,8 @@ public class PartitionUpdate extends AbstractBTreePartition
public List<CounterMark> collectCounterMarks()
{
assert metadata().isCounter();
- maybeBuild();
// We will take aliases on the rows of this update, and update them in-place. So we should be sure the
// update is now immutable for all intent and purposes.
- canReOpen = false;
-
List<CounterMark> marks = new ArrayList<>();
addMarksForRow(staticRow(), marks);
for (Row row : this)
@@ -521,7 +402,7 @@ public class PartitionUpdate extends AbstractBTreePartition
return marks;
}
- private void addMarksForRow(Row row, List<CounterMark> marks)
+ private static void addMarksForRow(Row row, List<CounterMark> marks)
{
for (Cell cell : row.cells())
{
@@ -530,109 +411,6 @@ public class PartitionUpdate extends AbstractBTreePartition
}
}
- private void assertNotBuilt()
- {
- if (isBuilt)
- throw new IllegalStateException("An update should not be written again once it has been read");
- }
-
- public void addPartitionDeletion(DeletionTime deletionTime)
- {
- assertNotBuilt();
- deletionInfo.add(deletionTime);
- }
-
- public void add(RangeTombstone range)
- {
- assertNotBuilt();
- deletionInfo.add(range, metadata().comparator);
- }
-
- /**
- * Adds a row to this update.
- *
- * There is no particular assumption made on the order of row added to a partition update. It is further
- * allowed to add the same row (more precisely, multiple row objects for the same clustering).
- *
- * Note however that the columns contained in the added row must be a subset of the columns used when
- * creating this update.
- *
- * @param row the row to add.
- */
- public void add(Row row)
- {
- if (row.isEmpty())
- return;
-
- assertNotBuilt();
-
- if (row.isStatic())
- {
- // this assert is expensive, and possibly of limited value; we should consider removing it
- // or introducing a new class of assertions for test purposes
- assert columns().statics.containsAll(row.columns()) : columns().statics + " is not superset of " + row.columns();
- Row staticRow = holder.staticRow.isEmpty()
- ? row
- : Rows.merge(holder.staticRow, row, createdAtInSec);
- holder = new Holder(holder.columns, holder.tree, holder.deletionInfo, staticRow, holder.stats);
- }
- else
- {
- // this assert is expensive, and possibly of limited value; we should consider removing it
- // or introducing a new class of assertions for test purposes
- assert columns().regulars.containsAll(row.columns()) : columns().regulars + " is not superset of " + row.columns();
- rowBuilder.add(row);
- }
- }
-
- private void maybeBuild()
- {
- if (isBuilt)
- return;
-
- build();
- }
-
- private synchronized void build()
- {
- if (isBuilt)
- return;
-
- Holder holder = this.holder;
- Object[] cur = holder.tree;
- Object[] add = rowBuilder.build();
- Object[] merged = BTree.<Row>merge(cur, add, metadata().comparator,
- UpdateFunction.Simple.of((a, b) -> Rows.merge(a, b, createdAtInSec)));
-
- assert deletionInfo == holder.deletionInfo;
- EncodingStats newStats = EncodingStats.Collector.collect(holder.staticRow, BTree.<Row>iterator(merged), deletionInfo);
-
- this.holder = new Holder(holder.columns, merged, holder.deletionInfo, holder.staticRow, newStats);
- rowBuilder = null;
- isBuilt = true;
- }
-
- @Override
- public String toString()
- {
- if (isBuilt)
- return super.toString();
-
- // We intentionally override AbstractBTreePartition#toString() to avoid iterating over the rows in the
- // partition, which can result in build() being triggered and lead to errors if the PartitionUpdate is later
- // modified.
-
- StringBuilder sb = new StringBuilder();
- sb.append(String.format("[%s] key=%s columns=%s",
- metadata.toString(),
- metadata.partitionKeyType.getString(partitionKey().getKey()),
- columns()));
-
- sb.append("\n deletionInfo=").append(deletionInfo);
- sb.append(" (not built)");
- return sb.toString();
- }
-
/**
* Creates a new simple partition update builder.
*
@@ -647,6 +425,11 @@ public class PartitionUpdate extends AbstractBTreePartition
return new SimpleBuilders.PartitionUpdateBuilder(metadata, partitionKeyValues);
}
+ public void validateIndexedColumns()
+ {
+ Keyspace.openAndGetStore(metadata()).indexManager.validate(this);
+ }
+
/**
* Interface for building partition updates geared towards human.
* <p>
@@ -712,6 +495,13 @@ public class PartitionUpdate extends AbstractBTreePartition
public RangeTombstoneBuilder addRangeTombstone();
/**
+ * Adds a new range tombstone to this update
+ *
+ * @return this builder
+ */
+ public SimpleBuilder addRangeTombstone(RangeTombstone rt);
+
+ /**
* Build the update represented by this builder.
*
* @return the built update.
@@ -892,4 +682,208 @@ public class PartitionUpdate extends AbstractBTreePartition
((BTreeRow)row).setValue(column, path, value);
}
}
+
+ /**
+ * Builder for PartitionUpdates
+ *
+ * This class is not thread safe, but the PartitionUpdate it produces is (since it is immutable).
+ */
+ public static class Builder
+ {
+ private final TableMetadata metadata;
+ private final DecoratedKey key;
+ private final MutableDeletionInfo deletionInfo;
+ private final boolean canHaveShadowedData;
+ private Object[] tree = BTree.empty();
+ private final BTree.Builder<Row> rowBuilder;
+ private final int createdAtInSec = FBUtilities.nowInSeconds();
+ private Row staticRow = Rows.EMPTY_STATIC_ROW;
+ private final RegularAndStaticColumns columns;
+ private boolean isBuilt = false;
+
+ public Builder(TableMetadata metadata,
+ DecoratedKey key,
+ RegularAndStaticColumns columns,
+ int initialRowCapacity,
+ boolean canHaveShadowedData)
+ {
+ this(metadata, key, columns, initialRowCapacity, canHaveShadowedData, Rows.EMPTY_STATIC_ROW, MutableDeletionInfo.live(), BTree.empty());
+ }
+
+ private Builder(TableMetadata metadata,
+ DecoratedKey key,
+ RegularAndStaticColumns columns,
+ int initialRowCapacity,
+ boolean canHaveShadowedData,
+ Holder holder)
+ {
+ this(metadata, key, columns, initialRowCapacity, canHaveShadowedData, holder.staticRow, holder.deletionInfo, holder.tree);
+ }
+
+ private Builder(TableMetadata metadata,
+ DecoratedKey key,
+ RegularAndStaticColumns columns,
+ int initialRowCapacity,
+ boolean canHaveShadowedData,
+ Row staticRow,
+ DeletionInfo deletionInfo,
+ Object[] tree)
+ {
+ this.metadata = metadata;
+ this.key = key;
+ this.columns = columns;
+ this.rowBuilder = rowBuilder(initialRowCapacity);
+ this.canHaveShadowedData = canHaveShadowedData;
+ this.deletionInfo = deletionInfo.mutableCopy();
+ this.staticRow = staticRow;
+ this.tree = tree;
+ }
+
+ public Builder(TableMetadata metadata, DecoratedKey key, RegularAndStaticColumns columnDefinitions, int size)
+ {
+ this(metadata, key, columnDefinitions, size, true);
+ }
+
+ public Builder(PartitionUpdate base, int initialRowCapacity)
+ {
+ this(base.metadata, base.partitionKey, base.columns(), initialRowCapacity, base.canHaveShadowedData, base.holder);
+ }
+
+ public Builder(TableMetadata metadata,
+ ByteBuffer key,
+ RegularAndStaticColumns columns,
+ int initialRowCapacity)
+ {
+ this(metadata, metadata.partitioner.decorateKey(key), columns, initialRowCapacity, true);
+ }
+
+ /**
+ * Adds a row to this update.
+ *
+ * There is no particular assumption made on the order of row added to a partition update. It is further
+ * allowed to add the same row (more precisely, multiple row objects for the same clustering).
+ *
+ * Note however that the columns contained in the added row must be a subset of the columns used when
+ * creating this update.
+ *
+ * @param row the row to add.
+ */
+ public void add(Row row)
+ {
+ if (row.isEmpty())
+ return;
+
+ if (row.isStatic())
+ {
+ // this assert is expensive, and possibly of limited value; we should consider removing it
+ // or introducing a new class of assertions for test purposes
+ assert columns().statics.containsAll(row.columns()) : columns().statics + " is not superset of " + row.columns();
+ staticRow = staticRow.isEmpty()
+ ? row
+ : Rows.merge(staticRow, row, createdAtInSec);
+ }
+ else
+ {
+ // this assert is expensive, and possibly of limited value; we should consider removing it
+ // or introducing a new class of assertions for test purposes
+ assert columns().regulars.containsAll(row.columns()) : columns().regulars + " is not superset of " + row.columns();
+ rowBuilder.add(row);
+ }
+ }
+
+ public void addPartitionDeletion(DeletionTime deletionTime)
+ {
+ deletionInfo.add(deletionTime);
+ }
+
+ public void add(RangeTombstone range)
+ {
+ deletionInfo.add(range, metadata.comparator);
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return key;
+ }
+
+ public TableMetadata metadata()
+ {
+ return metadata;
+ }
+
+ public PartitionUpdate build()
+ {
+ // assert that we are not calling build() several times
+ assert !isBuilt : "A PartitionUpdate.Builder should only get built once";
+ Object[] add = rowBuilder.build();
+ Object[] merged = BTree.<Row>merge(tree, add, metadata.comparator,
+ UpdateFunction.Simple.of((a, b) -> Rows.merge(a, b, createdAtInSec)));
+
+ EncodingStats newStats = EncodingStats.Collector.collect(staticRow, BTree.iterator(merged), deletionInfo);
+
+ isBuilt = true;
+ return new PartitionUpdate(metadata,
+ partitionKey(),
+ new Holder(columns,
+ merged,
+ deletionInfo,
+ staticRow,
+ newStats),
+ deletionInfo,
+ canHaveShadowedData);
+ }
+
+ public RegularAndStaticColumns columns()
+ {
+ return columns;
+ }
+
+ public DeletionTime partitionLevelDeletion()
+ {
+ return deletionInfo.getPartitionDeletion();
+ }
+
+ private BTree.Builder<Row> rowBuilder(int initialCapacity)
+ {
+ return BTree.<Row>builder(metadata.comparator, initialCapacity)
+ .setQuickResolver((a, b) ->
+ Rows.merge(a, b, createdAtInSec));
+ }
+ /**
+ * Modify this update to set every timestamp for live data to {@code newTimestamp} and
+ * every deletion timestamp to {@code newTimestamp - 1}.
+ *
+ * There is no reason to use that expect on the Paxos code path, where we need ensure that
+ * anything inserted use the ballot timestamp (to respect the order of update decided by
+ * the Paxos algorithm). We use {@code newTimestamp - 1} for deletions because tombstones
+ * always win on timestamp equality and we don't want to delete our own insertions
+ * (typically, when we overwrite a collection, we first set a complex deletion to delete the
+ * previous collection before adding new elements. If we were to set that complex deletion
+ * to the same timestamp that the new elements, it would delete those elements). And since
+ * tombstones always wins on timestamp equality, using -1 guarantees our deletion will still
+ * delete anything from a previous update.
+ */
+ public Builder updateAllTimestamp(long newTimestamp)
+ {
+ deletionInfo.updateAllTimestamp(newTimestamp - 1);
+ tree = BTree.<Row>transformAndFilter(tree, (x) -> x.updateAllTimestamp(newTimestamp));
+ staticRow = this.staticRow.updateAllTimestamp(newTimestamp);
+ return this;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Builder{" +
+ "metadata=" + metadata +
+ ", key=" + key +
+ ", deletionInfo=" + deletionInfo +
+ ", canHaveShadowedData=" + canHaveShadowedData +
+ ", createdAtInSec=" + createdAtInSec +
+ ", staticRow=" + staticRow +
+ ", columns=" + columns +
+ ", isBuilt=" + isBuilt +
+ '}';
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java
index 7a1373c..298fcfd 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -21,7 +21,9 @@ import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
@@ -510,23 +512,23 @@ public class TableViews extends AbstractCollection<View>
return mutations;
}
- Map<DecoratedKey, Mutation> mutations = new HashMap<>();
+ Map<DecoratedKey, Mutation.PartitionUpdateCollector> mutations = new HashMap<>();
for (ViewUpdateGenerator generator : generators)
{
for (PartitionUpdate update : generator.generateViewUpdates())
{
DecoratedKey key = update.partitionKey();
- Mutation mutation = mutations.get(key);
- if (mutation == null)
+ Mutation.PartitionUpdateCollector collector = mutations.get(key);
+ if (collector == null)
{
- mutation = new Mutation(baseTableMetadata.keyspace, key);
- mutations.put(key, mutation);
+ collector = new Mutation.PartitionUpdateCollector(baseTableMetadata.keyspace, key);
+ mutations.put(key, collector);
}
- mutation.add(update);
+ collector.add(update);
}
generator.clear();
}
- return mutations.values();
+ return mutations.values().stream().map(Mutation.PartitionUpdateCollector::build).collect(Collectors.toList());
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
index 794a6b7..73ca240 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.view;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.stream.Collectors;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
@@ -53,7 +54,7 @@ public class ViewUpdateGenerator
private final TableMetadata viewMetadata;
private final boolean baseEnforceStrictLiveness;
- private final Map<DecoratedKey, PartitionUpdate> updates = new HashMap<>();
+ private final Map<DecoratedKey, PartitionUpdate.Builder> updates = new HashMap<>();
// Reused internally to build a new entry
private final ByteBuffer[] currentViewEntryPartitionKey;
@@ -143,7 +144,7 @@ public class ViewUpdateGenerator
*/
public Collection<PartitionUpdate> generateViewUpdates()
{
- return updates.values();
+ return updates.values().stream().map(PartitionUpdate.Builder::build).collect(Collectors.toList());
}
/**
@@ -566,14 +567,13 @@ public class ViewUpdateGenerator
return;
DecoratedKey partitionKey = makeCurrentPartitionKey();
- PartitionUpdate update = updates.get(partitionKey);
- if (update == null)
- {
- // We can't really know which columns of the view will be updated nor how many row will be updated for this key
- // so we rely on hopefully sane defaults.
- update = new PartitionUpdate(viewMetadata, partitionKey, viewMetadata.regularAndStaticColumns(), 4);
- updates.put(partitionKey, update);
- }
+ // We can't really know which columns of the view will be updated nor how many row will be updated for this key
+ // so we rely on hopefully sane defaults.
+ PartitionUpdate.Builder update = updates.computeIfAbsent(partitionKey,
+ k -> new PartitionUpdate.Builder(viewMetadata,
+ partitionKey,
+ viewMetadata.regularAndStaticColumns(),
+ 4));
update.add(row);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 1fa5d8e..044a00b 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -115,7 +115,7 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
return maxGen;
}
- PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException
+ PartitionUpdate.Builder getUpdateFor(ByteBuffer key) throws IOException
{
return getUpdateFor(metadata.get().partitioner.decorateKey(key));
}
@@ -126,6 +126,6 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
* @param key they partition key for which the returned update will be.
* @return an update on partition {@code key} that is tied to this writer.
*/
- abstract PartitionUpdate getUpdateFor(DecoratedKey key) throws IOException;
+ abstract PartitionUpdate.Builder getUpdateFor(DecoratedKey key) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de7c24b3/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index afb4461..369be12 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -68,16 +68,16 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
diskWriter.start();
}
- PartitionUpdate getUpdateFor(DecoratedKey key)
+ PartitionUpdate.Builder getUpdateFor(DecoratedKey key)
{
assert key != null;
-
- PartitionUpdate previous = buffer.get(key);
+ PartitionUpdate.Builder previous = buffer.get(key);
if (previous == null)
{
- previous = createPartitionUpdate(key);
- currentSize += PartitionUpdate.serializer.serializedSize(previous, formatType.info.getLatestVersion().correspondingMessagingVersion());
- previous.allowNewUpdates();
+ // todo: inefficient - we create and serialize a PU just to get its size, then recreate it
+ // todo: either allow PartitionUpdateBuilder to have .build() called several times or pre-calculate the size
+ currentSize += PartitionUpdate.serializer.serializedSize(createPartitionUpdateBuilder(key).build(), formatType.info.getLatestVersion().correspondingMessagingVersion());
+ previous = createPartitionUpdateBuilder(key);
buffer.put(key, previous);
}
return previous;
@@ -108,9 +108,9 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
}
}
- private PartitionUpdate createPartitionUpdate(DecoratedKey key)
+ private PartitionUpdate.Builder createPartitionUpdateBuilder(DecoratedKey key)
{
- return new PartitionUpdate(metadata.get(), key, columns, 4)
+ return new PartitionUpdate.Builder(metadata.get(), key, columns, 4)
{
@Override
public void add(Row row)
@@ -188,7 +188,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
}
//// typedef
- static class Buffer extends TreeMap<DecoratedKey, PartitionUpdate> {}
+ static class Buffer extends TreeMap<DecoratedKey, PartitionUpdate.Builder> {}
private class DiskWriter extends FastThreadLocalThread
{
@@ -206,8 +206,8 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
try (SSTableTxnWriter writer = createWriter())
{
- for (Map.Entry<DecoratedKey, PartitionUpdate> entry : b.entrySet())
- writer.append(entry.getValue().unfilteredIterator());
+ for (Map.Entry<DecoratedKey, PartitionUpdate.Builder> entry : b.entrySet())
+ writer.append(entry.getValue().build().unfilteredIterator());
writer.finish(false);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org