You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/23 21:08:17 UTC
[1/3] git commit: Merge groupable mutations in
TriggerExecutor#execute()
Repository: cassandra
Updated Branches:
refs/heads/trunk f22e775fb -> ec7206ce6
Merge groupable mutations in TriggerExecutor#execute()
patch by Aleksey Yeschenko and Sergio Bossa for CASSANDRA-7047
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c1eb74ce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c1eb74ce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c1eb74ce
Branch: refs/heads/trunk
Commit: c1eb74ce47988c1e75d20ccb9a0320dd305c4b1c
Parents: 4e4d7bb
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Apr 23 21:30:36 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Apr 23 21:30:36 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/RowMutation.java | 5 +
.../apache/cassandra/service/StorageProxy.java | 23 +-
.../cassandra/triggers/TriggerExecutor.java | 71 ++--
.../cassandra/triggers/TriggerExecutorTest.java | 332 +++++++++++++++++++
.../apache/cassandra/triggers/TriggersTest.java | 39 +--
6 files changed, 417 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dbed949..68c335d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
* Queries on compact tables can return more rows that requested (CASSANDRA-7052)
* USING TIMESTAMP for batches does not work (CASSANDRA-7053)
* Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
+ * Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
Merged from 1.2:
* Fix batchlog to account for CF truncation records (CASSANDRA-6999)
* Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index 49ee2c5..223225e 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -75,6 +75,11 @@ public class RowMutation implements IMutation
this(cf.metadata().ksName, key, cf);
}
+ public RowMutation copy()
+ {
+ return new RowMutation(keyspaceName, key, new HashMap<>(modifications));
+ }
+
public String getKeyspaceName()
{
return keyspaceName;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8196352..14d5ee2 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -519,21 +519,20 @@ public class StorageProxy implements StorageProxyMBean
}
}
- public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically)
+ @SuppressWarnings("unchecked")
+ public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
+ ConsistencyLevel consistencyLevel,
+ boolean mutateAtomically)
throws WriteTimeoutException, UnavailableException, OverloadedException, InvalidRequestException
{
- Collection<RowMutation> tmutations = TriggerExecutor.instance.execute(mutations);
- if (mutateAtomically || tmutations != null)
- {
- Collection<RowMutation> allMutations = new ArrayList<>((Collection<RowMutation>) mutations);
- if (tmutations != null)
- allMutations.addAll(tmutations);
- StorageProxy.mutateAtomically(allMutations, consistencyLevel);
- }
+ Collection<RowMutation> augmented = TriggerExecutor.instance.execute(mutations);
+
+ if (augmented != null)
+ mutateAtomically(augmented, consistencyLevel);
+ else if (mutateAtomically)
+ mutateAtomically((Collection<RowMutation>) mutations, consistencyLevel);
else
- {
- StorageProxy.mutate(mutations, consistencyLevel);
- }
+ mutate(mutations, consistencyLevel);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index 8ccf937..988c6a7 100644
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@ -20,11 +20,9 @@ package org.apache.cassandra.triggers;
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -38,6 +36,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.HeapAllocator;
+import org.apache.cassandra.utils.Pair;
public class TriggerExecutor
{
@@ -68,7 +67,7 @@ public class TriggerExecutor
public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException
{
List<RowMutation> intermediate = executeInternal(key, updates);
- if (intermediate == null)
+ if (intermediate == null || intermediate.isEmpty())
return updates;
validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key, intermediate);
@@ -80,30 +79,62 @@ public class TriggerExecutor
return updates;
}
- public Collection<RowMutation> execute(Collection<? extends IMutation> updates) throws InvalidRequestException
+ public Collection<RowMutation> execute(Collection<? extends IMutation> mutations) throws InvalidRequestException
{
boolean hasCounters = false;
- Collection<RowMutation> tmutations = null;
- for (IMutation mutation : updates)
+ List<RowMutation> augmentedMutations = null;
+
+ for (IMutation mutation : mutations)
{
+ if (mutation instanceof CounterMutation)
+ hasCounters = true;
+
for (ColumnFamily cf : mutation.getColumnFamilies())
{
- List<RowMutation> intermediate = executeInternal(mutation.key(), cf);
- if (intermediate == null)
+ List<RowMutation> augmentations = executeInternal(mutation.key(), cf);
+ if (augmentations == null || augmentations.isEmpty())
continue;
- validate(intermediate);
- if (tmutations == null)
- tmutations = intermediate;
- else
- tmutations.addAll(intermediate);
+ validate(augmentations);
+
+ if (augmentedMutations == null)
+ augmentedMutations = new LinkedList<>();
+ augmentedMutations.addAll(augmentations);
}
- if (mutation instanceof CounterMutation)
- hasCounters = true;
}
- if (tmutations != null && hasCounters)
+
+ if (augmentedMutations == null)
+ return null;
+
+ if (hasCounters)
throw new InvalidRequestException("Counter mutations and trigger mutations cannot be applied together atomically.");
- return tmutations;
+
+ @SuppressWarnings("unchecked")
+ Collection<RowMutation> originalMutations = (Collection<RowMutation>) mutations;
+
+ return mergeMutations(Iterables.concat(originalMutations, augmentedMutations));
+ }
+
+ private Collection<RowMutation> mergeMutations(Iterable<RowMutation> mutations)
+ {
+ Map<Pair<String, ByteBuffer>, RowMutation> groupedMutations = new HashMap<>();
+
+ for (RowMutation mutation : mutations)
+ {
+ Pair<String, ByteBuffer> key = Pair.create(mutation.getKeyspaceName(), mutation.key());
+ RowMutation current = groupedMutations.get(key);
+ if (current == null)
+ {
+ // copy in case the mutation's modifications map is backed by an immutable Collections#singletonMap().
+ groupedMutations.put(key, mutation.copy());
+ }
+ else
+ {
+ current.addAll(mutation);
+ }
+ }
+
+ return groupedMutations.values();
}
private void validateForSinglePartition(AbstractType<?> keyValidator,
@@ -141,7 +172,7 @@ public class TriggerExecutor
*/
private List<RowMutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
{
- Map<String,TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
+ Map<String, TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
if (triggers.isEmpty())
return null;
List<RowMutation> tmutations = Lists.newLinkedList();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
new file mode 100644
index 0000000..ab7f7c4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.triggers;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.TriggerDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static org.junit.Assert.*;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class TriggerExecutorTest
+{
+ @Test
+ public void sameKeySameCfColumnFamilies() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName()));
+ ColumnFamily mutated = TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+ assertEquals(bytes("v1"), mutated.getColumn(getColumnName(metadata, "c1")).value());
+ assertEquals(bytes("trigger"), mutated.getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test(expected = InvalidRequestException.class)
+ public void sameKeyDifferentCfColumnFamilies() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName()));
+ TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+ }
+
+ @Test(expected = InvalidRequestException.class)
+ public void differentKeyColumnFamilies() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName()));
+ TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+ }
+
+ @Test
+ public void noTriggerMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", NoOpTrigger.class.getName()));
+ RowMutation rm = new RowMutation(bytes("k1"), makeCf(metadata, "v1", null));
+ assertNull(TriggerExecutor.instance.execute(Collections.singletonList(rm)));
+ }
+
+ @Test
+ public void sameKeySameCfRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName()));
+ ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+ ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+ RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+ RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+ assertEquals(2, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test
+ public void sameKeySameCfPartialRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfPartialTrigger.class.getName()));
+ ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+ ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+ RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+ RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+ assertEquals(2, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test
+ public void sameKeyDifferentCfRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName()));
+ ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+ ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+ RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+ RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+ assertEquals(2, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(2, mutatedCFs.size());
+
+ Collections.sort(mutatedCFs, new CfComparator());
+ assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+ assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value());
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(2, mutatedCFs.size());
+
+ Collections.sort(mutatedCFs, new CfComparator());
+ assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+ assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test
+ public void sameKeyDifferentKsRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentKsTrigger.class.getName()));
+ ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+ ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+ RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+ RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+ assertEquals(4, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+ mutatedCFs = new ArrayList<>(tmutations.get(2).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+
+ mutatedCFs = new ArrayList<>(tmutations.get(3).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test
+ public void differentKeyRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName()));
+ ColumnFamily cf = makeCf(metadata, "v1", null);
+ RowMutation rm = new RowMutation(UTF8Type.instance.fromString("k1"), cf);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm)));
+ assertEquals(2, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ assertEquals(bytes("k1"), tmutations.get(0).key());
+ assertEquals(bytes("otherKey"), tmutations.get(1).key());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ private static CFMetaData makeCfMetaData(String ks, String cf, TriggerDefinition trigger)
+ {
+
+ CFMetaData metadata = new CFMetaData(ks, cf, ColumnFamilyType.Standard, CompositeType.getInstance(UTF8Type.instance));
+
+ metadata.keyValidator(UTF8Type.instance);
+ metadata.addOrReplaceColumnDefinition(ColumnDefinition.partitionKeyDef(UTF8Type.instance.fromString("pkey"),
+ UTF8Type.instance,
+ null));
+ metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c1"),
+ UTF8Type.instance,
+ 0));
+ metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c2"),
+ UTF8Type.instance,
+ 0));
+ try
+ {
+ if (trigger != null)
+ metadata.addTriggerDefinition(trigger);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ return metadata.rebuild();
+ }
+
+ private static ColumnFamily makeCf(CFMetaData metadata, String columnValue1, String columnValue2)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+
+ if (columnValue1 != null)
+ cf.addColumn(new Column(getColumnName(metadata, "c1"), bytes(columnValue1)));
+
+ if (columnValue2 != null)
+ cf.addColumn(new Column(getColumnName(metadata, "c2"), bytes(columnValue2)));
+
+ return cf;
+ }
+
+ private static ByteBuffer getColumnName(CFMetaData metadata, String stringName)
+ {
+ return ((CompositeType) metadata.comparator).builder().add(bytes(stringName)).build();
+ }
+
+ public static class NoOpTrigger implements ITrigger
+ {
+ public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ return null;
+ }
+ }
+
+ public static class SameKeySameCfTrigger implements ITrigger
+ {
+ public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
+ cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+ return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf));
+ }
+ }
+
+ public static class SameKeySameCfPartialTrigger implements ITrigger
+ {
+ public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ if (!key.equals(bytes("k2")))
+ return null;
+
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
+ cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+ return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf));
+ }
+ }
+
+ public static class SameKeyDifferentCfTrigger implements ITrigger
+ {
+ public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData(update.metadata().ksName, "otherCf", null));
+ cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+ return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf));
+ }
+ }
+
+ public static class SameKeyDifferentKsTrigger implements ITrigger
+ {
+ public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData("otherKs", "otherCf", null));
+ cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+ return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf));
+ }
+ }
+
+ public static class DifferentKeyTrigger implements ITrigger
+ {
+ public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
+ cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+ return Collections.singletonList(new RowMutation(cf.metadata().ksName, bytes("otherKey"), cf));
+ }
+ }
+
+ private static class RmComparator implements Comparator<IMutation>
+ {
+ public int compare(IMutation m1, IMutation m2)
+ {
+ int cmp = m1.getKeyspaceName().compareTo(m2.getKeyspaceName());
+ return cmp != 0 ? cmp : m1.key().compareTo(m2.key());
+ }
+ }
+
+ private static class CfComparator implements Comparator<ColumnFamily>
+ {
+ public int compare(ColumnFamily cf1, ColumnFamily cf2)
+ {
+ return cf1.metadata().cfName.compareTo(cf2.metadata().cfName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/test/unit/org/apache/cassandra/triggers/TriggersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggersTest.java b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
index 5b9b27d..bda13ff 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@ -50,6 +50,8 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
public class TriggersTest extends SchemaLoader
{
private static boolean triggerCreated = false;
@@ -126,7 +128,7 @@ public class TriggersTest extends SchemaLoader
new TFramedTransportFactory().openTransport(
InetAddress.getLocalHost().getHostName(), 9170)));
client.set_keyspace(ksName);
- client.insert(ByteBufferUtil.bytes(2),
+ client.insert(bytes(2),
new ColumnParent(cfName),
getColumnForInsert("v1", 2),
org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -147,7 +149,7 @@ public class TriggersTest extends SchemaLoader
cosc.setColumn(getColumnForInsert("v1", 3));
mutation.setColumn_or_supercolumn(cosc);
client.batch_mutate(
- Collections.singletonMap(ByteBufferUtil.bytes(3),
+ Collections.singletonMap(bytes(3),
Collections.singletonMap(cfName,
Collections.singletonList(mutation))),
org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -183,9 +185,9 @@ public class TriggersTest extends SchemaLoader
new TFramedTransportFactory().openTransport(
InetAddress.getLocalHost().getHostName(), 9170)));
client.set_keyspace(ksName);
- client.cas(ByteBufferUtil.bytes(6),
+ client.cas(bytes(6),
cfName,
- Collections.EMPTY_LIST,
+ Collections.<org.apache.cassandra.thrift.Column>emptyList(),
Collections.singletonList(getColumnForInsert("v1", 6)),
org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -241,9 +243,9 @@ public class TriggersTest extends SchemaLoader
new TFramedTransportFactory().openTransport(
InetAddress.getLocalHost().getHostName(), 9170)));
client.set_keyspace(ksName);
- client.cas(ByteBufferUtil.bytes(9),
+ client.cas(bytes(9),
cf,
- Collections.EMPTY_LIST,
+ Collections.<org.apache.cassandra.thrift.Column>emptyList(),
Collections.singletonList(getColumnForInsert("v1", 9)),
org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -266,9 +268,9 @@ public class TriggersTest extends SchemaLoader
new TFramedTransportFactory().openTransport(
InetAddress.getLocalHost().getHostName(), 9170)));
client.set_keyspace(ksName);
- client.cas(ByteBufferUtil.bytes(10),
+ client.cas(bytes(10),
cf,
- Collections.EMPTY_LIST,
+ Collections.<org.apache.cassandra.thrift.Column>emptyList(),
Collections.singletonList(getColumnForInsert("v1", 10)),
org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -310,7 +312,7 @@ public class TriggersTest extends SchemaLoader
{
org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
column.setName(Schema.instance.getCFMetaData(ksName, cfName).comparator.fromString(columnName));
- column.setValue(ByteBufferUtil.bytes(value));
+ column.setValue(bytes(value));
column.setTimestamp(System.currentTimeMillis());
return column;
}
@@ -321,10 +323,8 @@ public class TriggersTest extends SchemaLoader
{
ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
- ByteBufferUtil.bytes(999)));
- RowMutation rm = new RowMutation(ksName, key);
- rm.add(extraUpdate);
- return Collections.singletonList(rm);
+ bytes(999)));
+ return Collections.singletonList(new RowMutation(ksName, key, extraUpdate));
}
}
@@ -334,12 +334,10 @@ public class TriggersTest extends SchemaLoader
{
ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
- ByteBufferUtil.bytes(999)));
+ bytes(999)));
int newKey = ByteBufferUtil.toInt(key) + 1000;
- RowMutation rm = new RowMutation(ksName, ByteBufferUtil.bytes(newKey));
- rm.add(extraUpdate);
- return Collections.singletonList(rm);
+ return Collections.singletonList(new RowMutation(ksName, bytes(newKey), extraUpdate));
}
}
@@ -349,11 +347,8 @@ public class TriggersTest extends SchemaLoader
{
ColumnFamily extraUpdate = ArrayBackedSortedColumns.factory.create(ksName, otherCf);
extraUpdate.addColumn(new Column(extraUpdate.metadata().comparator.fromString("v2"),
- ByteBufferUtil.bytes(999)));
-
- RowMutation rm = new RowMutation(ksName, key);
- rm.add(extraUpdate);
- return Collections.singletonList(rm);
+ bytes(999)));
+ return Collections.singletonList(new RowMutation(ksName, key, extraUpdate));
}
}
}
[2/3] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/service/StorageProxy.java
src/java/org/apache/cassandra/triggers/TriggerExecutor.java
test/unit/org/apache/cassandra/triggers/TriggersTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f521227
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f521227
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f521227
Branch: refs/heads/trunk
Commit: 1f5212278378bda5623c48eaef39f4a54aef2e49
Parents: 4e95953 c1eb74c
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Apr 23 22:06:48 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Apr 23 22:06:48 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Mutation.java | 5 +
.../apache/cassandra/service/StorageProxy.java | 23 +-
.../cassandra/triggers/TriggerExecutor.java | 71 ++--
.../cassandra/triggers/TriggerExecutorTest.java | 336 +++++++++++++++++++
.../apache/cassandra/triggers/TriggersTest.java | 22 +-
6 files changed, 410 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f521227/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d32a107,68c335d..295eb78
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -107,10 -64,6 +107,11 @@@ Merged from 2.0
(CASSANDRA-6906)
* Fix SSTable not released if stream session fails (CASSANDRA-6818)
* Avoid build failure due to ANTLR timeout (CASSANDRA-6991)
+ * Queries on compact tables can return more rows that requested (CASSANDRA-7052)
+ * USING TIMESTAMP for batches does not work (CASSANDRA-7053)
+ * Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
+ * Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058)
++ * Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
Merged from 1.2:
* Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
* add extra SSL cipher suites (CASSANDRA-6613)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f521227/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Mutation.java
index 47c990f,0000000..6fae9b0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@@ -1,342 -1,0 +1,347 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+// TODO convert this to a Builder pattern instead of encouraging M.add directly,
+// which is less-efficient since we have to keep a mutable HashMap around
+public class Mutation implements IMutation
+{
+ public static final MutationSerializer serializer = new MutationSerializer();
+
+ public static final String FORWARD_TO = "FWD_TO";
+ public static final String FORWARD_FROM = "FWD_FRM";
+
+ // todo this is redundant
+ // when we remove it, also restore SerializationsTest.testMutationRead to not regenerate new Mutations each test
+ private final String keyspaceName;
+
+ private final ByteBuffer key;
+ // map of column family id to mutations for that column family.
+ private final Map<UUID, ColumnFamily> modifications;
+
+ public Mutation(String keyspaceName, ByteBuffer key)
+ {
+ this(keyspaceName, key, new HashMap<UUID, ColumnFamily>());
+ }
+
+ public Mutation(String keyspaceName, ByteBuffer key, ColumnFamily cf)
+ {
+ this(keyspaceName, key, Collections.singletonMap(cf.id(), cf));
+ }
+
+ public Mutation(String keyspaceName, Row row)
+ {
+ this(keyspaceName, row.key.key, row.cf);
+ }
+
+ protected Mutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications)
+ {
+ this.keyspaceName = keyspaceName;
+ this.key = key;
+ this.modifications = modifications;
+ }
+
+ public Mutation(ByteBuffer key, ColumnFamily cf)
+ {
+ this(cf.metadata().ksName, key, cf);
+ }
+
++ public Mutation copy()
++ {
++ return new Mutation(keyspaceName, key, new HashMap<>(modifications));
++ }
++
+ public String getKeyspaceName()
+ {
+ return keyspaceName;
+ }
+
+ public Collection<UUID> getColumnFamilyIds()
+ {
+ return modifications.keySet();
+ }
+
+ public ByteBuffer key()
+ {
+ return key;
+ }
+
+ public Collection<ColumnFamily> getColumnFamilies()
+ {
+ return modifications.values();
+ }
+
+ public ColumnFamily getColumnFamily(UUID cfId)
+ {
+ return modifications.get(cfId);
+ }
+
+ /*
+ * Specify a column family name and the corresponding column
+ * family object.
+ * param @ cf - column family name
+ * param @ columnFamily - the column family.
+ */
+ public void add(ColumnFamily columnFamily)
+ {
+ assert columnFamily != null;
+ ColumnFamily prev = modifications.put(columnFamily.id(), columnFamily);
+ if (prev != null)
+ // developer error
+ throw new IllegalArgumentException("ColumnFamily " + columnFamily + " already has modifications in this mutation: " + prev);
+ }
+
+ /**
+ * @return the ColumnFamily in this Mutation corresponding to @param cfName, creating an empty one if necessary.
+ */
+ public ColumnFamily addOrGet(String cfName)
+ {
+ return addOrGet(Schema.instance.getCFMetaData(keyspaceName, cfName));
+ }
+
+ public ColumnFamily addOrGet(CFMetaData cfm)
+ {
+ ColumnFamily cf = modifications.get(cfm.cfId);
+ if (cf == null)
+ {
+ cf = ArrayBackedSortedColumns.factory.create(cfm);
+ modifications.put(cfm.cfId, cf);
+ }
+ return cf;
+ }
+
+ public boolean isEmpty()
+ {
+ return modifications.isEmpty();
+ }
+
+ public void add(String cfName, CellName name, ByteBuffer value, long timestamp, int timeToLive)
+ {
+ addOrGet(cfName).addColumn(name, value, timestamp, timeToLive);
+ }
+
+ public void addCounter(String cfName, CellName name, long value)
+ {
+ addOrGet(cfName).addCounter(name, value);
+ }
+
+ public void add(String cfName, CellName name, ByteBuffer value, long timestamp)
+ {
+ add(cfName, name, value, timestamp, 0);
+ }
+
+ public void delete(String cfName, long timestamp)
+ {
+ int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+ addOrGet(cfName).delete(new DeletionInfo(timestamp, localDeleteTime));
+ }
+
+ public void delete(String cfName, CellName name, long timestamp)
+ {
+ int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+ addOrGet(cfName).addTombstone(name, localDeleteTime, timestamp);
+ }
+
+ public void deleteRange(String cfName, Composite start, Composite end, long timestamp)
+ {
+ int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+ addOrGet(cfName).addAtom(new RangeTombstone(start, end, timestamp, localDeleteTime));
+ }
+
+ public void addAll(IMutation m)
+ {
+ if (!(m instanceof Mutation))
+ throw new IllegalArgumentException();
+
+ Mutation mutation = (Mutation)m;
+ if (!keyspaceName.equals(mutation.keyspaceName) || !key.equals(mutation.key))
+ throw new IllegalArgumentException();
+
+ for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet())
+ {
+ // It's slighty faster to assume the key wasn't present and fix if
+ // not in the case where it wasn't there indeed.
+ ColumnFamily cf = modifications.put(entry.getKey(), entry.getValue());
+ if (cf != null)
+ entry.getValue().addAll(cf);
+ }
+ }
+
+ /*
+ * This is equivalent to calling commit. Applies the changes to
+ * to the keyspace that is obtained by calling Keyspace.open().
+ */
+ public void apply()
+ {
+ Keyspace ks = Keyspace.open(keyspaceName);
+ ks.apply(this, ks.metadata.durableWrites);
+ }
+
+ public void applyUnsafe()
+ {
+ Keyspace.open(keyspaceName).apply(this, false);
+ }
+
+ public MessageOut<Mutation> createMessage()
+ {
+ return createMessage(MessagingService.Verb.MUTATION);
+ }
+
+ public MessageOut<Mutation> createMessage(MessagingService.Verb verb)
+ {
+ return new MessageOut<>(verb, this, serializer);
+ }
+
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout();
+ }
+
+ public String toString()
+ {
+ return toString(false);
+ }
+
+ public String toString(boolean shallow)
+ {
+ StringBuilder buff = new StringBuilder("Mutation(");
+ buff.append("keyspace='").append(keyspaceName).append('\'');
+ buff.append(", key='").append(ByteBufferUtil.bytesToHex(key)).append('\'');
+ buff.append(", modifications=[");
+ if (shallow)
+ {
+ List<String> cfnames = new ArrayList<String>(modifications.size());
+ for (UUID cfid : modifications.keySet())
+ {
+ CFMetaData cfm = Schema.instance.getCFMetaData(cfid);
+ cfnames.add(cfm == null ? "-dropped-" : cfm.cfName);
+ }
+ buff.append(StringUtils.join(cfnames, ", "));
+ }
+ else
+ buff.append(StringUtils.join(modifications.values(), ", "));
+ return buff.append("])").toString();
+ }
+
+ public Mutation without(UUID cfId)
+ {
+ Mutation mutation = new Mutation(keyspaceName, key);
+ for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet())
+ if (!entry.getKey().equals(cfId))
+ mutation.add(entry.getValue());
+ return mutation;
+ }
+
+ public static class MutationSerializer implements IVersionedSerializer<Mutation>
+ {
+ public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
+ {
+ if (version < MessagingService.VERSION_20)
+ out.writeUTF(mutation.getKeyspaceName());
+
+ ByteBufferUtil.writeWithShortLength(mutation.key(), out);
+
+ /* serialize the modifications in the mutation */
+ int size = mutation.modifications.size();
+ out.writeInt(size);
+ assert size > 0;
+ for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet())
+ ColumnFamily.serializer.serialize(entry.getValue(), out, version);
+ }
+
+ public Mutation deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
+ {
+ String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that
+ if (version < MessagingService.VERSION_20)
+ keyspaceName = in.readUTF();
+
+ ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+ int size = in.readInt();
+ assert size > 0;
+
+ Map<UUID, ColumnFamily> modifications;
+ if (size == 1)
+ {
+ ColumnFamily cf = deserializeOneCf(in, version, flag);
+ modifications = Collections.singletonMap(cf.id(), cf);
+ keyspaceName = cf.metadata().ksName;
+ }
+ else
+ {
+ modifications = new HashMap<UUID, ColumnFamily>();
+ for (int i = 0; i < size; ++i)
+ {
+ ColumnFamily cf = deserializeOneCf(in, version, flag);
+ modifications.put(cf.id(), cf);
+ keyspaceName = cf.metadata().ksName;
+ }
+ }
+
+ return new Mutation(keyspaceName, key, modifications);
+ }
+
+ private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
+ {
+ ColumnFamily cf = ColumnFamily.serializer.deserialize(in, ArrayBackedSortedColumns.factory, flag, version);
+ // We don't allow Mutation with null column family, so we should never get null back.
+ assert cf != null;
+ return cf;
+ }
+
+ public Mutation deserialize(DataInput in, int version) throws IOException
+ {
+ return deserialize(in, version, ColumnSerializer.Flag.FROM_REMOTE);
+ }
+
+ public long serializedSize(Mutation mutation, int version)
+ {
+ TypeSizes sizes = TypeSizes.NATIVE;
+ int size = 0;
+
+ if (version < MessagingService.VERSION_20)
+ size += sizes.sizeof(mutation.getKeyspaceName());
+
+ int keySize = mutation.key().remaining();
+ size += sizes.sizeof((short) keySize) + keySize;
+
+ size += sizes.sizeof(mutation.modifications.size());
+ for (Map.Entry<UUID,ColumnFamily> entry : mutation.modifications.entrySet())
+ size += ColumnFamily.serializer.serializedSize(entry.getValue(), TypeSizes.NATIVE, version);
+
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f521227/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index d8c5813,14d5ee2..269d68f
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -522,21 -519,20 +522,20 @@@ public class StorageProxy implements St
}
}
- public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically)
+ @SuppressWarnings("unchecked")
+ public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
+ ConsistencyLevel consistencyLevel,
+ boolean mutateAtomically)
throws WriteTimeoutException, UnavailableException, OverloadedException, InvalidRequestException
{
- Collection<Mutation> tmutations = TriggerExecutor.instance.execute(mutations);
- if (mutateAtomically || tmutations != null)
- {
- Collection<Mutation> allMutations = new ArrayList<>((Collection<Mutation>) mutations);
- if (tmutations != null)
- allMutations.addAll(tmutations);
- StorageProxy.mutateAtomically(allMutations, consistencyLevel);
- }
- Collection<RowMutation> augmented = TriggerExecutor.instance.execute(mutations);
++ Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations);
+
+ if (augmented != null)
+ mutateAtomically(augmented, consistencyLevel);
+ else if (mutateAtomically)
- mutateAtomically((Collection<RowMutation>) mutations, consistencyLevel);
++ mutateAtomically((Collection<Mutation>) mutations, consistencyLevel);
else
- {
- StorageProxy.mutate(mutations, consistencyLevel);
- }
+ mutate(mutations, consistencyLevel);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f521227/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index 8a8c51d,988c6a7..4416a57
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@@ -35,6 -35,8 +33,7 @@@ import org.apache.cassandra.db.composit
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.HeapAllocator;
+ import org.apache.cassandra.utils.Pair;
public class TriggerExecutor
{
@@@ -64,8 -66,8 +63,8 @@@
public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException
{
- List<RowMutation> intermediate = executeInternal(key, updates);
+ List<Mutation> intermediate = executeInternal(key, updates);
- if (intermediate == null)
+ if (intermediate == null || intermediate.isEmpty())
return updates;
validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key, intermediate);
@@@ -80,30 -79,62 +79,62 @@@
return updates;
}
- public Collection<Mutation> execute(Collection<? extends IMutation> updates) throws InvalidRequestException
- public Collection<RowMutation> execute(Collection<? extends IMutation> mutations) throws InvalidRequestException
++ public Collection<Mutation> execute(Collection<? extends IMutation> mutations) throws InvalidRequestException
{
boolean hasCounters = false;
- Collection<Mutation> tmutations = null;
- for (IMutation mutation : updates)
- List<RowMutation> augmentedMutations = null;
++ List<Mutation> augmentedMutations = null;
+
+ for (IMutation mutation : mutations)
{
+ if (mutation instanceof CounterMutation)
+ hasCounters = true;
+
for (ColumnFamily cf : mutation.getColumnFamilies())
{
- List<Mutation> intermediate = executeInternal(mutation.key(), cf);
- if (intermediate == null)
- List<RowMutation> augmentations = executeInternal(mutation.key(), cf);
++ List<Mutation> augmentations = executeInternal(mutation.key(), cf);
+ if (augmentations == null || augmentations.isEmpty())
continue;
- validate(intermediate);
- if (tmutations == null)
- tmutations = intermediate;
- else
- tmutations.addAll(intermediate);
+ validate(augmentations);
+
+ if (augmentedMutations == null)
+ augmentedMutations = new LinkedList<>();
+ augmentedMutations.addAll(augmentations);
}
- if (mutation instanceof CounterMutation)
- hasCounters = true;
}
- if (tmutations != null && hasCounters)
+
+ if (augmentedMutations == null)
+ return null;
+
+ if (hasCounters)
throw new InvalidRequestException("Counter mutations and trigger mutations cannot be applied together atomically.");
- return tmutations;
+
+ @SuppressWarnings("unchecked")
- Collection<RowMutation> originalMutations = (Collection<RowMutation>) mutations;
++ Collection<Mutation> originalMutations = (Collection<Mutation>) mutations;
+
+ return mergeMutations(Iterables.concat(originalMutations, augmentedMutations));
+ }
+
- private Collection<RowMutation> mergeMutations(Iterable<RowMutation> mutations)
++ private Collection<Mutation> mergeMutations(Iterable<Mutation> mutations)
+ {
- Map<Pair<String, ByteBuffer>, RowMutation> groupedMutations = new HashMap<>();
++ Map<Pair<String, ByteBuffer>, Mutation> groupedMutations = new HashMap<>();
+
- for (RowMutation mutation : mutations)
++ for (Mutation mutation : mutations)
+ {
+ Pair<String, ByteBuffer> key = Pair.create(mutation.getKeyspaceName(), mutation.key());
- RowMutation current = groupedMutations.get(key);
++ Mutation current = groupedMutations.get(key);
+ if (current == null)
+ {
+ // copy in case the mutation's modifications map is backed by an immutable Collections#singletonMap().
+ groupedMutations.put(key, mutation.copy());
+ }
+ else
+ {
+ current.addAll(mutation);
+ }
+ }
+
+ return groupedMutations.values();
}
private void validateForSinglePartition(AbstractType<?> keyValidator,
@@@ -141,12 -170,12 +172,12 @@@
* Switch class loader before using the triggers for the column family, if
* not loaded them with the custom class loader.
*/
- private List<RowMutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
+ private List<Mutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
{
- Map<String,TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
+ Map<String, TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
if (triggers.isEmpty())
return null;
- List<RowMutation> tmutations = Lists.newLinkedList();
+ List<Mutation> tmutations = Lists.newLinkedList();
Thread.currentThread().setContextClassLoader(customClassLoader);
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f521227/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
index 0000000,ab7f7c4..b0c45b9
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
@@@ -1,0 -1,332 +1,336 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.triggers;
+
+ import java.nio.ByteBuffer;
+ import java.util.*;
+ import org.junit.Test;
+
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.TriggerDefinition;
+ import org.apache.cassandra.db.*;
++import org.apache.cassandra.db.composites.CellName;
+ import org.apache.cassandra.db.marshal.CompositeType;
+ import org.apache.cassandra.db.marshal.UTF8Type;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
+
+ import static org.junit.Assert.*;
+
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+ public class TriggerExecutorTest
+ {
+ @Test
+ public void sameKeySameCfColumnFamilies() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName()));
+ ColumnFamily mutated = TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+ assertEquals(bytes("v1"), mutated.getColumn(getColumnName(metadata, "c1")).value());
+ assertEquals(bytes("trigger"), mutated.getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test(expected = InvalidRequestException.class)
+ public void sameKeyDifferentCfColumnFamilies() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName()));
+ TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+ }
+
+ @Test(expected = InvalidRequestException.class)
+ public void differentKeyColumnFamilies() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName()));
+ TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+ }
+
+ @Test
+ public void noTriggerMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", NoOpTrigger.class.getName()));
- RowMutation rm = new RowMutation(bytes("k1"), makeCf(metadata, "v1", null));
++ Mutation rm = new Mutation(bytes("k1"), makeCf(metadata, "v1", null));
+ assertNull(TriggerExecutor.instance.execute(Collections.singletonList(rm)));
+ }
+
+ @Test
+ public void sameKeySameCfRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName()));
+ ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+ ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
- RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
- RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
++ Mutation rm1 = new Mutation(bytes("k1"), cf1);
++ Mutation rm2 = new Mutation(bytes("k2"), cf2);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+ assertEquals(2, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test
+ public void sameKeySameCfPartialRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfPartialTrigger.class.getName()));
+ ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+ ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
- RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
- RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
++ Mutation rm1 = new Mutation(bytes("k1"), cf1);
++ Mutation rm2 = new Mutation(bytes("k2"), cf2);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+ assertEquals(2, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test
+ public void sameKeyDifferentCfRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName()));
+ ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+ ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
- RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
- RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
++ Mutation rm1 = new Mutation(bytes("k1"), cf1);
++ Mutation rm2 = new Mutation(bytes("k2"), cf2);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+ assertEquals(2, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(2, mutatedCFs.size());
+
+ Collections.sort(mutatedCFs, new CfComparator());
+ assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+ assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value());
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(2, mutatedCFs.size());
+
+ Collections.sort(mutatedCFs, new CfComparator());
+ assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+ assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test
+ public void sameKeyDifferentKsRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentKsTrigger.class.getName()));
+ ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+ ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
- RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
- RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
++ Mutation rm1 = new Mutation(bytes("k1"), cf1);
++ Mutation rm2 = new Mutation(bytes("k2"), cf2);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+ assertEquals(4, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+ mutatedCFs = new ArrayList<>(tmutations.get(2).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+
+ mutatedCFs = new ArrayList<>(tmutations.get(3).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test
+ public void differentKeyRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName()));
+ ColumnFamily cf = makeCf(metadata, "v1", null);
- RowMutation rm = new RowMutation(UTF8Type.instance.fromString("k1"), cf);
++ Mutation rm = new Mutation(UTF8Type.instance.fromString("k1"), cf);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm)));
+ assertEquals(2, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ assertEquals(bytes("k1"), tmutations.get(0).key());
+ assertEquals(bytes("otherKey"), tmutations.get(1).key());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ private static CFMetaData makeCfMetaData(String ks, String cf, TriggerDefinition trigger)
+ {
+
- CFMetaData metadata = new CFMetaData(ks, cf, ColumnFamilyType.Standard, CompositeType.getInstance(UTF8Type.instance));
++ CFMetaData metadata = CFMetaData.sparseCFMetaData(ks, cf, CompositeType.getInstance(UTF8Type.instance));
+
+ metadata.keyValidator(UTF8Type.instance);
- metadata.addOrReplaceColumnDefinition(ColumnDefinition.partitionKeyDef(UTF8Type.instance.fromString("pkey"),
++ metadata.addOrReplaceColumnDefinition(ColumnDefinition.partitionKeyDef(metadata,
++ UTF8Type.instance.fromString("pkey"),
+ UTF8Type.instance,
+ null));
- metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c1"),
++ metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(metadata,
++ UTF8Type.instance.fromString("c1"),
+ UTF8Type.instance,
+ 0));
- metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c2"),
++ metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(metadata,
++ UTF8Type.instance.fromString("c2"),
+ UTF8Type.instance,
+ 0));
+ try
+ {
+ if (trigger != null)
+ metadata.addTriggerDefinition(trigger);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ return metadata.rebuild();
+ }
+
+ private static ColumnFamily makeCf(CFMetaData metadata, String columnValue1, String columnValue2)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+
+ if (columnValue1 != null)
- cf.addColumn(new Column(getColumnName(metadata, "c1"), bytes(columnValue1)));
++ cf.addColumn(new Cell(getColumnName(metadata, "c1"), bytes(columnValue1)));
+
+ if (columnValue2 != null)
- cf.addColumn(new Column(getColumnName(metadata, "c2"), bytes(columnValue2)));
++ cf.addColumn(new Cell(getColumnName(metadata, "c2"), bytes(columnValue2)));
+
+ return cf;
+ }
+
- private static ByteBuffer getColumnName(CFMetaData metadata, String stringName)
++ private static CellName getColumnName(CFMetaData metadata, String stringName)
+ {
- return ((CompositeType) metadata.comparator).builder().add(bytes(stringName)).build();
++ return metadata.comparator.makeCellName(stringName);
+ }
+
+ public static class NoOpTrigger implements ITrigger
+ {
- public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++ public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ return null;
+ }
+ }
+
+ public static class SameKeySameCfTrigger implements ITrigger
+ {
- public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++ public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
- cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
- return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf));
++ cf.addColumn(new Cell(getColumnName(update.metadata(), "c2"), bytes("trigger")));
++ return Collections.singletonList(new Mutation(update.metadata().ksName, key, cf));
+ }
+ }
+
+ public static class SameKeySameCfPartialTrigger implements ITrigger
+ {
- public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++ public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ if (!key.equals(bytes("k2")))
+ return null;
+
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
- cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
- return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf));
++ cf.addColumn(new Cell(getColumnName(update.metadata(), "c2"), bytes("trigger")));
++ return Collections.singletonList(new Mutation(update.metadata().ksName, key, cf));
+ }
+ }
+
+ public static class SameKeyDifferentCfTrigger implements ITrigger
+ {
- public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++ public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData(update.metadata().ksName, "otherCf", null));
- cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
- return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf));
++ cf.addColumn(new Cell(getColumnName(update.metadata(), "c2"), bytes("trigger")));
++ return Collections.singletonList(new Mutation(cf.metadata().ksName, key, cf));
+ }
+ }
+
+ public static class SameKeyDifferentKsTrigger implements ITrigger
+ {
- public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++ public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData("otherKs", "otherCf", null));
- cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
- return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf));
++ cf.addColumn(new Cell(getColumnName(update.metadata(), "c2"), bytes("trigger")));
++ return Collections.singletonList(new Mutation(cf.metadata().ksName, key, cf));
+ }
+ }
+
+ public static class DifferentKeyTrigger implements ITrigger
+ {
- public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++ public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
- cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
- return Collections.singletonList(new RowMutation(cf.metadata().ksName, bytes("otherKey"), cf));
++ cf.addColumn(new Cell(getColumnName(update.metadata(), "c2"), bytes("trigger")));
++ return Collections.singletonList(new Mutation(cf.metadata().ksName, bytes("otherKey"), cf));
+ }
+ }
+
+ private static class RmComparator implements Comparator<IMutation>
+ {
+ public int compare(IMutation m1, IMutation m2)
+ {
+ int cmp = m1.getKeyspaceName().compareTo(m2.getKeyspaceName());
+ return cmp != 0 ? cmp : m1.key().compareTo(m2.key());
+ }
+ }
+
+ private static class CfComparator implements Comparator<ColumnFamily>
+ {
+ public int compare(ColumnFamily cf1, ColumnFamily cf2)
+ {
+ return cf1.metadata().cfName.compareTo(cf2.metadata().cfName);
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f521227/test/unit/org/apache/cassandra/triggers/TriggersTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/triggers/TriggersTest.java
index 79133e2,bda13ff..28464e5
--- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@@ -313,43 -319,36 +313,33 @@@ public class TriggersTest extends Schem
public static class TestTrigger implements ITrigger
{
- public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
{
ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
- extraUpdate.addColumn(new Cell(update.metadata().comparator.makeCellName(bytes("v2")),
- bytes(999)));
- Mutation mutation = new Mutation(ksName, key);
- mutation.add(extraUpdate);
- return Collections.singletonList(mutation);
- extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
- bytes(999)));
- return Collections.singletonList(new RowMutation(ksName, key, extraUpdate));
++ extraUpdate.addColumn(new Cell(update.metadata().comparator.makeCellName(bytes("v2")), bytes(999)));
++ return Collections.singletonList(new Mutation(ksName, key, extraUpdate));
}
}
public static class CrossPartitionTrigger implements ITrigger
{
- public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
{
ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
- extraUpdate.addColumn(new Cell(update.metadata().comparator.makeCellName(bytes("v2")),
- bytes(999)));
- extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
- bytes(999)));
++ extraUpdate.addColumn(new Cell(update.metadata().comparator.makeCellName(bytes("v2")), bytes(999)));
- int newKey = ByteBufferUtil.toInt(key) + 1000;
- return Collections.singletonList(new RowMutation(ksName, bytes(newKey), extraUpdate));
+ int newKey = toInt(key) + 1000;
- Mutation mutation = new Mutation(ksName, bytes(newKey));
- mutation.add(extraUpdate);
- return Collections.singletonList(mutation);
++ return Collections.singletonList(new Mutation(ksName, bytes(newKey), extraUpdate));
}
}
public static class CrossTableTrigger implements ITrigger
{
- public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
{
ColumnFamily extraUpdate = ArrayBackedSortedColumns.factory.create(ksName, otherCf);
- extraUpdate.addColumn(new Cell(extraUpdate.metadata().comparator.makeCellName(bytes("v2")),
- bytes(999)));
-
- Mutation mutation = new Mutation(ksName, key);
- mutation.add(extraUpdate);
- return Collections.singletonList(mutation);
- extraUpdate.addColumn(new Column(extraUpdate.metadata().comparator.fromString("v2"),
- bytes(999)));
- return Collections.singletonList(new RowMutation(ksName, key, extraUpdate));
++ extraUpdate.addColumn(new Cell(extraUpdate.metadata().comparator.makeCellName(bytes("v2")), bytes(999)));
++ return Collections.singletonList(new Mutation(ksName, key, extraUpdate));
}
}
}
[3/3] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec7206ce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec7206ce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec7206ce
Branch: refs/heads/trunk
Commit: ec7206ce6a50d2ca03ac23f8ba6ec8eea9cfe6d8
Parents: f22e775 1f52122
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Apr 23 22:07:10 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Apr 23 22:07:10 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Mutation.java | 5 +
.../apache/cassandra/service/StorageProxy.java | 23 +-
.../cassandra/triggers/TriggerExecutor.java | 71 ++--
.../cassandra/triggers/TriggerExecutorTest.java | 336 +++++++++++++++++++
.../apache/cassandra/triggers/TriggersTest.java | 22 +-
6 files changed, 410 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7206ce/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7206ce/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------