You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/23 20:32:12 UTC

git commit: Merge groupable mutations in TriggerExecutor#execute()

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 4e4d7bbcb -> c1eb74ce4


Merge groupable mutations in TriggerExecutor#execute()

patch by Aleksey Yeschenko and Sergio Bossa for CASSANDRA-7047


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c1eb74ce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c1eb74ce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c1eb74ce

Branch: refs/heads/cassandra-2.0
Commit: c1eb74ce47988c1e75d20ccb9a0320dd305c4b1c
Parents: 4e4d7bb
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Apr 23 21:30:36 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Apr 23 21:30:36 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/RowMutation.java    |   5 +
 .../apache/cassandra/service/StorageProxy.java  |  23 +-
 .../cassandra/triggers/TriggerExecutor.java     |  71 ++--
 .../cassandra/triggers/TriggerExecutorTest.java | 332 +++++++++++++++++++
 .../apache/cassandra/triggers/TriggersTest.java |  39 +--
 6 files changed, 417 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dbed949..68c335d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
  * Queries on compact tables can return more rows that requested (CASSANDRA-7052)
  * USING TIMESTAMP for batches does not work (CASSANDRA-7053)
  * Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
+ * Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
 Merged from 1.2:
  * Fix batchlog to account for CF truncation records (CASSANDRA-6999)
  * Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index 49ee2c5..223225e 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -75,6 +75,11 @@ public class RowMutation implements IMutation
         this(cf.metadata().ksName, key, cf);
     }
 
+    public RowMutation copy()
+    {
+        return new RowMutation(keyspaceName, key, new HashMap<>(modifications));
+    }
+
     public String getKeyspaceName()
     {
         return keyspaceName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8196352..14d5ee2 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -519,21 +519,20 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically)
+    @SuppressWarnings("unchecked")
+    public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
+                                          ConsistencyLevel consistencyLevel,
+                                          boolean mutateAtomically)
     throws WriteTimeoutException, UnavailableException, OverloadedException, InvalidRequestException
     {
-        Collection<RowMutation> tmutations = TriggerExecutor.instance.execute(mutations);
-        if (mutateAtomically || tmutations != null)
-        {
-            Collection<RowMutation> allMutations = new ArrayList<>((Collection<RowMutation>) mutations);
-            if (tmutations != null)
-                allMutations.addAll(tmutations);
-            StorageProxy.mutateAtomically(allMutations, consistencyLevel);
-        }
+        Collection<RowMutation> augmented = TriggerExecutor.instance.execute(mutations);
+
+        if (augmented != null)
+            mutateAtomically(augmented, consistencyLevel);
+        else if (mutateAtomically)
+            mutateAtomically((Collection<RowMutation>) mutations, consistencyLevel);
         else
-        {
-            StorageProxy.mutate(mutations, consistencyLevel);
-        }
+            mutate(mutations, consistencyLevel);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index 8ccf937..988c6a7 100644
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@ -20,11 +20,9 @@ package org.apache.cassandra.triggers;
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -38,6 +36,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.HeapAllocator;
+import org.apache.cassandra.utils.Pair;
 
 public class TriggerExecutor
 {
@@ -68,7 +67,7 @@ public class TriggerExecutor
     public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException
     {
         List<RowMutation> intermediate = executeInternal(key, updates);
-        if (intermediate == null)
+        if (intermediate == null || intermediate.isEmpty())
             return updates;
 
         validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key, intermediate);
@@ -80,30 +79,62 @@ public class TriggerExecutor
         return updates;
     }
 
-    public Collection<RowMutation> execute(Collection<? extends IMutation> updates) throws InvalidRequestException
+    public Collection<RowMutation> execute(Collection<? extends IMutation> mutations) throws InvalidRequestException
     {
         boolean hasCounters = false;
-        Collection<RowMutation> tmutations = null;
-        for (IMutation mutation : updates)
+        List<RowMutation> augmentedMutations = null;
+
+        for (IMutation mutation : mutations)
         {
+            if (mutation instanceof CounterMutation)
+                hasCounters = true;
+
             for (ColumnFamily cf : mutation.getColumnFamilies())
             {
-                List<RowMutation> intermediate = executeInternal(mutation.key(), cf);
-                if (intermediate == null)
+                List<RowMutation> augmentations = executeInternal(mutation.key(), cf);
+                if (augmentations == null || augmentations.isEmpty())
                     continue;
 
-                validate(intermediate);
-                if (tmutations == null)
-                    tmutations = intermediate;
-                else
-                    tmutations.addAll(intermediate);
+                validate(augmentations);
+
+                if (augmentedMutations == null)
+                    augmentedMutations = new LinkedList<>();
+                augmentedMutations.addAll(augmentations);
             }
-            if (mutation instanceof CounterMutation)
-                hasCounters = true;
         }
-        if (tmutations != null && hasCounters)
+
+        if (augmentedMutations == null)
+            return null;
+
+        if (hasCounters)
             throw new InvalidRequestException("Counter mutations and trigger mutations cannot be applied together atomically.");
-        return tmutations;
+
+        @SuppressWarnings("unchecked")
+        Collection<RowMutation> originalMutations = (Collection<RowMutation>) mutations;
+
+        return mergeMutations(Iterables.concat(originalMutations, augmentedMutations));
+    }
+
+    private Collection<RowMutation> mergeMutations(Iterable<RowMutation> mutations)
+    {
+        Map<Pair<String, ByteBuffer>, RowMutation> groupedMutations = new HashMap<>();
+
+        for (RowMutation mutation : mutations)
+        {
+            Pair<String, ByteBuffer> key = Pair.create(mutation.getKeyspaceName(), mutation.key());
+            RowMutation current = groupedMutations.get(key);
+            if (current == null)
+            {
+                // copy in case the mutation's modifications map is backed by an immutable Collections#singletonMap().
+                groupedMutations.put(key, mutation.copy());
+            }
+            else
+            {
+                current.addAll(mutation);
+            }
+        }
+
+        return groupedMutations.values();
     }
 
     private void validateForSinglePartition(AbstractType<?> keyValidator,
@@ -141,7 +172,7 @@ public class TriggerExecutor
      */
     private List<RowMutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
     {
-        Map<String,TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
+        Map<String, TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
         if (triggers.isEmpty())
             return null;
         List<RowMutation> tmutations = Lists.newLinkedList();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
new file mode 100644
index 0000000..ab7f7c4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.triggers;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.TriggerDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static org.junit.Assert.*;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class TriggerExecutorTest
+{
+    @Test
+    public void sameKeySameCfColumnFamilies() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName()));
+        ColumnFamily mutated = TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+        assertEquals(bytes("v1"), mutated.getColumn(getColumnName(metadata, "c1")).value());
+        assertEquals(bytes("trigger"), mutated.getColumn(getColumnName(metadata, "c2")).value());
+    }
+
+    @Test(expected = InvalidRequestException.class)
+    public void sameKeyDifferentCfColumnFamilies() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName()));
+        TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+    }
+
+    @Test(expected = InvalidRequestException.class)
+    public void differentKeyColumnFamilies() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName()));
+        TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+    }
+
+    @Test
+    public void noTriggerMutations() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", NoOpTrigger.class.getName()));
+        RowMutation rm = new RowMutation(bytes("k1"), makeCf(metadata, "v1", null));
+        assertNull(TriggerExecutor.instance.execute(Collections.singletonList(rm)));
+    }
+
+    @Test
+    public void sameKeySameCfRowMutations() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName()));
+        ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+        ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+        RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+        RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+        List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+        assertEquals(2, tmutations.size());
+        Collections.sort(tmutations, new RmComparator());
+
+        List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+
+        mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+    }
+
+    @Test
+    public void sameKeySameCfPartialRowMutations() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfPartialTrigger.class.getName()));
+        ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+        ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+        RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+        RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+        List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+        assertEquals(2, tmutations.size());
+        Collections.sort(tmutations, new RmComparator());
+
+        List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+        mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+    }
+
+    @Test
+    public void sameKeyDifferentCfRowMutations() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName()));
+        ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+        ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+        RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+        RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+        List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+        assertEquals(2, tmutations.size());
+        Collections.sort(tmutations, new RmComparator());
+
+        List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+        assertEquals(2, mutatedCFs.size());
+
+        Collections.sort(mutatedCFs, new CfComparator());
+        assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+        assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1")));
+        assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value());
+
+        mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+        assertEquals(2, mutatedCFs.size());
+
+        Collections.sort(mutatedCFs, new CfComparator());
+        assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+        assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1")));
+        assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value());
+    }
+
+    @Test
+    public void sameKeyDifferentKsRowMutations() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentKsTrigger.class.getName()));
+        ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+        ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+        RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+        RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+        List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+        assertEquals(4, tmutations.size());
+        Collections.sort(tmutations, new RmComparator());
+
+        List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+        mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+        mutatedCFs = new ArrayList<>(tmutations.get(2).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+        assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+
+        mutatedCFs = new ArrayList<>(tmutations.get(3).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+        assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+    }
+
+    @Test
+    public void differentKeyRowMutations() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName()));
+        ColumnFamily cf = makeCf(metadata, "v1", null);
+        RowMutation rm = new RowMutation(UTF8Type.instance.fromString("k1"), cf);
+
+        List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm)));
+        assertEquals(2, tmutations.size());
+        Collections.sort(tmutations, new RmComparator());
+
+        assertEquals(bytes("k1"), tmutations.get(0).key());
+        assertEquals(bytes("otherKey"), tmutations.get(1).key());
+
+        List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+        mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+        assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+    }
+
+    private static CFMetaData makeCfMetaData(String ks, String cf, TriggerDefinition trigger)
+    {
+
+        CFMetaData metadata = new CFMetaData(ks, cf, ColumnFamilyType.Standard, CompositeType.getInstance(UTF8Type.instance));
+
+        metadata.keyValidator(UTF8Type.instance);
+        metadata.addOrReplaceColumnDefinition(ColumnDefinition.partitionKeyDef(UTF8Type.instance.fromString("pkey"),
+                                                                               UTF8Type.instance,
+                                                                               null));
+        metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c1"),
+                                                                          UTF8Type.instance,
+                                                                          0));
+        metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c2"),
+                                                                          UTF8Type.instance,
+                                                                          0));
+        try
+        {
+            if (trigger != null)
+                metadata.addTriggerDefinition(trigger);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new AssertionError(e);
+        }
+
+        return metadata.rebuild();
+    }
+
+    private static ColumnFamily makeCf(CFMetaData metadata, String columnValue1, String columnValue2)
+    {
+        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+
+        if (columnValue1 != null)
+            cf.addColumn(new Column(getColumnName(metadata, "c1"), bytes(columnValue1)));
+
+        if (columnValue2 != null)
+            cf.addColumn(new Column(getColumnName(metadata, "c2"), bytes(columnValue2)));
+
+        return cf;
+    }
+
+    private static ByteBuffer getColumnName(CFMetaData metadata, String stringName)
+    {
+        return ((CompositeType) metadata.comparator).builder().add(bytes(stringName)).build();
+    }
+
+    public static class NoOpTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            return null;
+        }
+    }
+
+    public static class SameKeySameCfTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
+            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+            return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf));
+        }
+    }
+
+    public static class SameKeySameCfPartialTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            if (!key.equals(bytes("k2")))
+                return null;
+
+            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
+            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+            return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf));
+        }
+    }
+
+    public static class SameKeyDifferentCfTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData(update.metadata().ksName, "otherCf", null));
+            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+            return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf));
+        }
+    }
+
+    public static class SameKeyDifferentKsTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData("otherKs", "otherCf", null));
+            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+            return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf));
+        }
+    }
+
+    public static class DifferentKeyTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
+            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+            return Collections.singletonList(new RowMutation(cf.metadata().ksName, bytes("otherKey"), cf));
+        }
+    }
+
+    private static class RmComparator implements Comparator<IMutation>
+    {
+        public int compare(IMutation m1, IMutation m2)
+        {
+            int cmp = m1.getKeyspaceName().compareTo(m2.getKeyspaceName());
+            return cmp != 0 ? cmp : m1.key().compareTo(m2.key());
+        }
+    }
+
+    private static class CfComparator implements Comparator<ColumnFamily>
+    {
+        public int compare(ColumnFamily cf1, ColumnFamily cf2)
+        {
+            return cf1.metadata().cfName.compareTo(cf2.metadata().cfName);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/test/unit/org/apache/cassandra/triggers/TriggersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggersTest.java b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
index 5b9b27d..bda13ff 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@ -50,6 +50,8 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
 public class TriggersTest extends SchemaLoader
 {
     private static boolean triggerCreated = false;
@@ -126,7 +128,7 @@ public class TriggersTest extends SchemaLoader
                                             new TFramedTransportFactory().openTransport(
                                                 InetAddress.getLocalHost().getHostName(), 9170)));
         client.set_keyspace(ksName);
-        client.insert(ByteBufferUtil.bytes(2),
+        client.insert(bytes(2),
                       new ColumnParent(cfName),
                       getColumnForInsert("v1", 2),
                       org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -147,7 +149,7 @@ public class TriggersTest extends SchemaLoader
         cosc.setColumn(getColumnForInsert("v1", 3));
         mutation.setColumn_or_supercolumn(cosc);
         client.batch_mutate(
-            Collections.singletonMap(ByteBufferUtil.bytes(3),
+            Collections.singletonMap(bytes(3),
                                      Collections.singletonMap(cfName,
                                                               Collections.singletonList(mutation))),
             org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -183,9 +185,9 @@ public class TriggersTest extends SchemaLoader
                         new TFramedTransportFactory().openTransport(
                                 InetAddress.getLocalHost().getHostName(), 9170)));
         client.set_keyspace(ksName);
-        client.cas(ByteBufferUtil.bytes(6),
+        client.cas(bytes(6),
                    cfName,
-                   Collections.EMPTY_LIST,
+                   Collections.<org.apache.cassandra.thrift.Column>emptyList(),
                    Collections.singletonList(getColumnForInsert("v1", 6)),
                    org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
                    org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -241,9 +243,9 @@ public class TriggersTest extends SchemaLoader
                             new TFramedTransportFactory().openTransport(
                                     InetAddress.getLocalHost().getHostName(), 9170)));
             client.set_keyspace(ksName);
-            client.cas(ByteBufferUtil.bytes(9),
+            client.cas(bytes(9),
                        cf,
-                       Collections.EMPTY_LIST,
+                       Collections.<org.apache.cassandra.thrift.Column>emptyList(),
                        Collections.singletonList(getColumnForInsert("v1", 9)),
                        org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
                        org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -266,9 +268,9 @@ public class TriggersTest extends SchemaLoader
                             new TFramedTransportFactory().openTransport(
                                     InetAddress.getLocalHost().getHostName(), 9170)));
             client.set_keyspace(ksName);
-            client.cas(ByteBufferUtil.bytes(10),
+            client.cas(bytes(10),
                        cf,
-                       Collections.EMPTY_LIST,
+                       Collections.<org.apache.cassandra.thrift.Column>emptyList(),
                        Collections.singletonList(getColumnForInsert("v1", 10)),
                        org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
                        org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -310,7 +312,7 @@ public class TriggersTest extends SchemaLoader
     {
         org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
         column.setName(Schema.instance.getCFMetaData(ksName, cfName).comparator.fromString(columnName));
-        column.setValue(ByteBufferUtil.bytes(value));
+        column.setValue(bytes(value));
         column.setTimestamp(System.currentTimeMillis());
         return column;
     }
@@ -321,10 +323,8 @@ public class TriggersTest extends SchemaLoader
         {
             ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
             extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
-                                             ByteBufferUtil.bytes(999)));
-            RowMutation rm = new RowMutation(ksName, key);
-            rm.add(extraUpdate);
-            return Collections.singletonList(rm);
+                                             bytes(999)));
+            return Collections.singletonList(new RowMutation(ksName, key, extraUpdate));
         }
     }
 
@@ -334,12 +334,10 @@ public class TriggersTest extends SchemaLoader
         {
             ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
             extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
-                                             ByteBufferUtil.bytes(999)));
+                                             bytes(999)));
 
             int newKey = ByteBufferUtil.toInt(key) + 1000;
-            RowMutation rm = new RowMutation(ksName, ByteBufferUtil.bytes(newKey));
-            rm.add(extraUpdate);
-            return Collections.singletonList(rm);
+            return Collections.singletonList(new RowMutation(ksName, bytes(newKey), extraUpdate));
         }
     }
 
@@ -349,11 +347,8 @@ public class TriggersTest extends SchemaLoader
         {
             ColumnFamily extraUpdate = ArrayBackedSortedColumns.factory.create(ksName, otherCf);
             extraUpdate.addColumn(new Column(extraUpdate.metadata().comparator.fromString("v2"),
-                                             ByteBufferUtil.bytes(999)));
-
-            RowMutation rm = new RowMutation(ksName, key);
-            rm.add(extraUpdate);
-            return Collections.singletonList(rm);
+                                             bytes(999)));
+            return Collections.singletonList(new RowMutation(ksName, key, extraUpdate));
         }
     }
 }