You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/23 20:32:12 UTC
git commit: Merge groupable mutations in TriggerExecutor#execute()
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 4e4d7bbcb -> c1eb74ce4
Merge groupable mutations in TriggerExecutor#execute()
patch by Aleksey Yeschenko and Sergio Bossa for CASSANDRA-7047
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c1eb74ce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c1eb74ce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c1eb74ce
Branch: refs/heads/cassandra-2.0
Commit: c1eb74ce47988c1e75d20ccb9a0320dd305c4b1c
Parents: 4e4d7bb
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Apr 23 21:30:36 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Apr 23 21:30:36 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/RowMutation.java | 5 +
.../apache/cassandra/service/StorageProxy.java | 23 +-
.../cassandra/triggers/TriggerExecutor.java | 71 ++--
.../cassandra/triggers/TriggerExecutorTest.java | 332 +++++++++++++++++++
.../apache/cassandra/triggers/TriggersTest.java | 39 +--
6 files changed, 417 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dbed949..68c335d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
* Queries on compact tables can return more rows that requested (CASSANDRA-7052)
* USING TIMESTAMP for batches does not work (CASSANDRA-7053)
* Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
+ * Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
Merged from 1.2:
* Fix batchlog to account for CF truncation records (CASSANDRA-6999)
* Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index 49ee2c5..223225e 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -75,6 +75,11 @@ public class RowMutation implements IMutation
this(cf.metadata().ksName, key, cf);
}
+ public RowMutation copy()
+ {
+ return new RowMutation(keyspaceName, key, new HashMap<>(modifications));
+ }
+
public String getKeyspaceName()
{
return keyspaceName;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8196352..14d5ee2 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -519,21 +519,20 @@ public class StorageProxy implements StorageProxyMBean
}
}
- public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically)
+ @SuppressWarnings("unchecked")
+ public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
+ ConsistencyLevel consistencyLevel,
+ boolean mutateAtomically)
throws WriteTimeoutException, UnavailableException, OverloadedException, InvalidRequestException
{
- Collection<RowMutation> tmutations = TriggerExecutor.instance.execute(mutations);
- if (mutateAtomically || tmutations != null)
- {
- Collection<RowMutation> allMutations = new ArrayList<>((Collection<RowMutation>) mutations);
- if (tmutations != null)
- allMutations.addAll(tmutations);
- StorageProxy.mutateAtomically(allMutations, consistencyLevel);
- }
+ Collection<RowMutation> augmented = TriggerExecutor.instance.execute(mutations);
+
+ if (augmented != null)
+ mutateAtomically(augmented, consistencyLevel);
+ else if (mutateAtomically)
+ mutateAtomically((Collection<RowMutation>) mutations, consistencyLevel);
else
- {
- StorageProxy.mutate(mutations, consistencyLevel);
- }
+ mutate(mutations, consistencyLevel);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index 8ccf937..988c6a7 100644
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@ -20,11 +20,9 @@ package org.apache.cassandra.triggers;
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -38,6 +36,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.HeapAllocator;
+import org.apache.cassandra.utils.Pair;
public class TriggerExecutor
{
@@ -68,7 +67,7 @@ public class TriggerExecutor
public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException
{
List<RowMutation> intermediate = executeInternal(key, updates);
- if (intermediate == null)
+ if (intermediate == null || intermediate.isEmpty())
return updates;
validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key, intermediate);
@@ -80,30 +79,62 @@ public class TriggerExecutor
return updates;
}
- public Collection<RowMutation> execute(Collection<? extends IMutation> updates) throws InvalidRequestException
+ public Collection<RowMutation> execute(Collection<? extends IMutation> mutations) throws InvalidRequestException
{
boolean hasCounters = false;
- Collection<RowMutation> tmutations = null;
- for (IMutation mutation : updates)
+ List<RowMutation> augmentedMutations = null;
+
+ for (IMutation mutation : mutations)
{
+ if (mutation instanceof CounterMutation)
+ hasCounters = true;
+
for (ColumnFamily cf : mutation.getColumnFamilies())
{
- List<RowMutation> intermediate = executeInternal(mutation.key(), cf);
- if (intermediate == null)
+ List<RowMutation> augmentations = executeInternal(mutation.key(), cf);
+ if (augmentations == null || augmentations.isEmpty())
continue;
- validate(intermediate);
- if (tmutations == null)
- tmutations = intermediate;
- else
- tmutations.addAll(intermediate);
+ validate(augmentations);
+
+ if (augmentedMutations == null)
+ augmentedMutations = new LinkedList<>();
+ augmentedMutations.addAll(augmentations);
}
- if (mutation instanceof CounterMutation)
- hasCounters = true;
}
- if (tmutations != null && hasCounters)
+
+ if (augmentedMutations == null)
+ return null;
+
+ if (hasCounters)
throw new InvalidRequestException("Counter mutations and trigger mutations cannot be applied together atomically.");
- return tmutations;
+
+ @SuppressWarnings("unchecked")
+ Collection<RowMutation> originalMutations = (Collection<RowMutation>) mutations;
+
+ return mergeMutations(Iterables.concat(originalMutations, augmentedMutations));
+ }
+
+ private Collection<RowMutation> mergeMutations(Iterable<RowMutation> mutations)
+ {
+ Map<Pair<String, ByteBuffer>, RowMutation> groupedMutations = new HashMap<>();
+
+ for (RowMutation mutation : mutations)
+ {
+ Pair<String, ByteBuffer> key = Pair.create(mutation.getKeyspaceName(), mutation.key());
+ RowMutation current = groupedMutations.get(key);
+ if (current == null)
+ {
+ // copy in case the mutation's modifications map is backed by an immutable Collections#singletonMap().
+ groupedMutations.put(key, mutation.copy());
+ }
+ else
+ {
+ current.addAll(mutation);
+ }
+ }
+
+ return groupedMutations.values();
}
private void validateForSinglePartition(AbstractType<?> keyValidator,
@@ -141,7 +172,7 @@ public class TriggerExecutor
*/
private List<RowMutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
{
- Map<String,TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
+ Map<String, TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
if (triggers.isEmpty())
return null;
List<RowMutation> tmutations = Lists.newLinkedList();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
new file mode 100644
index 0000000..ab7f7c4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.triggers;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.TriggerDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static org.junit.Assert.*;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class TriggerExecutorTest
+{
+ @Test
+ public void sameKeySameCfColumnFamilies() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName()));
+ ColumnFamily mutated = TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+ assertEquals(bytes("v1"), mutated.getColumn(getColumnName(metadata, "c1")).value());
+ assertEquals(bytes("trigger"), mutated.getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test(expected = InvalidRequestException.class)
+ public void sameKeyDifferentCfColumnFamilies() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName()));
+ TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+ }
+
+ @Test(expected = InvalidRequestException.class)
+ public void differentKeyColumnFamilies() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName()));
+ TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+ }
+
+ @Test
+ public void noTriggerMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", NoOpTrigger.class.getName()));
+ RowMutation rm = new RowMutation(bytes("k1"), makeCf(metadata, "v1", null));
+ assertNull(TriggerExecutor.instance.execute(Collections.singletonList(rm)));
+ }
+
+ @Test
+ public void sameKeySameCfRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName()));
+ ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+ ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+ RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+ RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+ assertEquals(2, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test
+ public void sameKeySameCfPartialRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfPartialTrigger.class.getName()));
+ ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+ ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+ RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+ RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+ assertEquals(2, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test
+ public void sameKeyDifferentCfRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName()));
+ ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+ ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+ RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+ RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+ assertEquals(2, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(2, mutatedCFs.size());
+
+ Collections.sort(mutatedCFs, new CfComparator());
+ assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+ assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value());
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(2, mutatedCFs.size());
+
+ Collections.sort(mutatedCFs, new CfComparator());
+ assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+ assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test
+ public void sameKeyDifferentKsRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentKsTrigger.class.getName()));
+ ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+ ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+ RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+ RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+ assertEquals(4, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+ mutatedCFs = new ArrayList<>(tmutations.get(2).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+
+ mutatedCFs = new ArrayList<>(tmutations.get(3).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ @Test
+ public void differentKeyRowMutations() throws ConfigurationException, InvalidRequestException
+ {
+ CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName()));
+ ColumnFamily cf = makeCf(metadata, "v1", null);
+ RowMutation rm = new RowMutation(UTF8Type.instance.fromString("k1"), cf);
+
+ List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm)));
+ assertEquals(2, tmutations.size());
+ Collections.sort(tmutations, new RmComparator());
+
+ assertEquals(bytes("k1"), tmutations.get(0).key());
+ assertEquals(bytes("otherKey"), tmutations.get(1).key());
+
+ List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertEquals(bytes("v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+ mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+ assertEquals(1, mutatedCFs.size());
+ assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+ assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ }
+
+ private static CFMetaData makeCfMetaData(String ks, String cf, TriggerDefinition trigger)
+ {
+
+ CFMetaData metadata = new CFMetaData(ks, cf, ColumnFamilyType.Standard, CompositeType.getInstance(UTF8Type.instance));
+
+ metadata.keyValidator(UTF8Type.instance);
+ metadata.addOrReplaceColumnDefinition(ColumnDefinition.partitionKeyDef(UTF8Type.instance.fromString("pkey"),
+ UTF8Type.instance,
+ null));
+ metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c1"),
+ UTF8Type.instance,
+ 0));
+ metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c2"),
+ UTF8Type.instance,
+ 0));
+ try
+ {
+ if (trigger != null)
+ metadata.addTriggerDefinition(trigger);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ return metadata.rebuild();
+ }
+
+ private static ColumnFamily makeCf(CFMetaData metadata, String columnValue1, String columnValue2)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+
+ if (columnValue1 != null)
+ cf.addColumn(new Column(getColumnName(metadata, "c1"), bytes(columnValue1)));
+
+ if (columnValue2 != null)
+ cf.addColumn(new Column(getColumnName(metadata, "c2"), bytes(columnValue2)));
+
+ return cf;
+ }
+
+ private static ByteBuffer getColumnName(CFMetaData metadata, String stringName)
+ {
+ return ((CompositeType) metadata.comparator).builder().add(bytes(stringName)).build();
+ }
+
+ public static class NoOpTrigger implements ITrigger
+ {
+ public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ return null;
+ }
+ }
+
+ public static class SameKeySameCfTrigger implements ITrigger
+ {
+ public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
+ cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+ return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf));
+ }
+ }
+
+ public static class SameKeySameCfPartialTrigger implements ITrigger
+ {
+ public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ if (!key.equals(bytes("k2")))
+ return null;
+
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
+ cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+ return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf));
+ }
+ }
+
+ public static class SameKeyDifferentCfTrigger implements ITrigger
+ {
+ public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData(update.metadata().ksName, "otherCf", null));
+ cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+ return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf));
+ }
+ }
+
+ public static class SameKeyDifferentKsTrigger implements ITrigger
+ {
+ public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData("otherKs", "otherCf", null));
+ cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+ return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf));
+ }
+ }
+
+ public static class DifferentKeyTrigger implements ITrigger
+ {
+ public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
+ cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+ return Collections.singletonList(new RowMutation(cf.metadata().ksName, bytes("otherKey"), cf));
+ }
+ }
+
+ private static class RmComparator implements Comparator<IMutation>
+ {
+ public int compare(IMutation m1, IMutation m2)
+ {
+ int cmp = m1.getKeyspaceName().compareTo(m2.getKeyspaceName());
+ return cmp != 0 ? cmp : m1.key().compareTo(m2.key());
+ }
+ }
+
+ private static class CfComparator implements Comparator<ColumnFamily>
+ {
+ public int compare(ColumnFamily cf1, ColumnFamily cf2)
+ {
+ return cf1.metadata().cfName.compareTo(cf2.metadata().cfName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/test/unit/org/apache/cassandra/triggers/TriggersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggersTest.java b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
index 5b9b27d..bda13ff 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@ -50,6 +50,8 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
public class TriggersTest extends SchemaLoader
{
private static boolean triggerCreated = false;
@@ -126,7 +128,7 @@ public class TriggersTest extends SchemaLoader
new TFramedTransportFactory().openTransport(
InetAddress.getLocalHost().getHostName(), 9170)));
client.set_keyspace(ksName);
- client.insert(ByteBufferUtil.bytes(2),
+ client.insert(bytes(2),
new ColumnParent(cfName),
getColumnForInsert("v1", 2),
org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -147,7 +149,7 @@ public class TriggersTest extends SchemaLoader
cosc.setColumn(getColumnForInsert("v1", 3));
mutation.setColumn_or_supercolumn(cosc);
client.batch_mutate(
- Collections.singletonMap(ByteBufferUtil.bytes(3),
+ Collections.singletonMap(bytes(3),
Collections.singletonMap(cfName,
Collections.singletonList(mutation))),
org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -183,9 +185,9 @@ public class TriggersTest extends SchemaLoader
new TFramedTransportFactory().openTransport(
InetAddress.getLocalHost().getHostName(), 9170)));
client.set_keyspace(ksName);
- client.cas(ByteBufferUtil.bytes(6),
+ client.cas(bytes(6),
cfName,
- Collections.EMPTY_LIST,
+ Collections.<org.apache.cassandra.thrift.Column>emptyList(),
Collections.singletonList(getColumnForInsert("v1", 6)),
org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -241,9 +243,9 @@ public class TriggersTest extends SchemaLoader
new TFramedTransportFactory().openTransport(
InetAddress.getLocalHost().getHostName(), 9170)));
client.set_keyspace(ksName);
- client.cas(ByteBufferUtil.bytes(9),
+ client.cas(bytes(9),
cf,
- Collections.EMPTY_LIST,
+ Collections.<org.apache.cassandra.thrift.Column>emptyList(),
Collections.singletonList(getColumnForInsert("v1", 9)),
org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -266,9 +268,9 @@ public class TriggersTest extends SchemaLoader
new TFramedTransportFactory().openTransport(
InetAddress.getLocalHost().getHostName(), 9170)));
client.set_keyspace(ksName);
- client.cas(ByteBufferUtil.bytes(10),
+ client.cas(bytes(10),
cf,
- Collections.EMPTY_LIST,
+ Collections.<org.apache.cassandra.thrift.Column>emptyList(),
Collections.singletonList(getColumnForInsert("v1", 10)),
org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -310,7 +312,7 @@ public class TriggersTest extends SchemaLoader
{
org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
column.setName(Schema.instance.getCFMetaData(ksName, cfName).comparator.fromString(columnName));
- column.setValue(ByteBufferUtil.bytes(value));
+ column.setValue(bytes(value));
column.setTimestamp(System.currentTimeMillis());
return column;
}
@@ -321,10 +323,8 @@ public class TriggersTest extends SchemaLoader
{
ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
- ByteBufferUtil.bytes(999)));
- RowMutation rm = new RowMutation(ksName, key);
- rm.add(extraUpdate);
- return Collections.singletonList(rm);
+ bytes(999)));
+ return Collections.singletonList(new RowMutation(ksName, key, extraUpdate));
}
}
@@ -334,12 +334,10 @@ public class TriggersTest extends SchemaLoader
{
ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
- ByteBufferUtil.bytes(999)));
+ bytes(999)));
int newKey = ByteBufferUtil.toInt(key) + 1000;
- RowMutation rm = new RowMutation(ksName, ByteBufferUtil.bytes(newKey));
- rm.add(extraUpdate);
- return Collections.singletonList(rm);
+ return Collections.singletonList(new RowMutation(ksName, bytes(newKey), extraUpdate));
}
}
@@ -349,11 +347,8 @@ public class TriggersTest extends SchemaLoader
{
ColumnFamily extraUpdate = ArrayBackedSortedColumns.factory.create(ksName, otherCf);
extraUpdate.addColumn(new Column(extraUpdate.metadata().comparator.fromString("v2"),
- ByteBufferUtil.bytes(999)));
-
- RowMutation rm = new RowMutation(ksName, key);
- rm.add(extraUpdate);
- return Collections.singletonList(rm);
+ bytes(999)));
+ return Collections.singletonList(new RowMutation(ksName, key, extraUpdate));
}
}
}