You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/23 21:08:17 UTC

[1/3] git commit: Merge groupable mutations in TriggerExecutor#execute()

Repository: cassandra
Updated Branches:
  refs/heads/trunk f22e775fb -> ec7206ce6


Merge groupable mutations in TriggerExecutor#execute()

patch by Aleksey Yeschenko and Sergio Bossa for CASSANDRA-7047


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c1eb74ce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c1eb74ce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c1eb74ce

Branch: refs/heads/trunk
Commit: c1eb74ce47988c1e75d20ccb9a0320dd305c4b1c
Parents: 4e4d7bb
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Apr 23 21:30:36 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Apr 23 21:30:36 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/RowMutation.java    |   5 +
 .../apache/cassandra/service/StorageProxy.java  |  23 +-
 .../cassandra/triggers/TriggerExecutor.java     |  71 ++--
 .../cassandra/triggers/TriggerExecutorTest.java | 332 +++++++++++++++++++
 .../apache/cassandra/triggers/TriggersTest.java |  39 +--
 6 files changed, 417 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dbed949..68c335d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
  * Queries on compact tables can return more rows that requested (CASSANDRA-7052)
  * USING TIMESTAMP for batches does not work (CASSANDRA-7053)
  * Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
+ * Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
 Merged from 1.2:
  * Fix batchlog to account for CF truncation records (CASSANDRA-6999)
  * Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index 49ee2c5..223225e 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -75,6 +75,11 @@ public class RowMutation implements IMutation
         this(cf.metadata().ksName, key, cf);
     }
 
+    public RowMutation copy()
+    {
+        return new RowMutation(keyspaceName, key, new HashMap<>(modifications));
+    }
+
     public String getKeyspaceName()
     {
         return keyspaceName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8196352..14d5ee2 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -519,21 +519,20 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically)
+    @SuppressWarnings("unchecked")
+    public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
+                                          ConsistencyLevel consistencyLevel,
+                                          boolean mutateAtomically)
     throws WriteTimeoutException, UnavailableException, OverloadedException, InvalidRequestException
     {
-        Collection<RowMutation> tmutations = TriggerExecutor.instance.execute(mutations);
-        if (mutateAtomically || tmutations != null)
-        {
-            Collection<RowMutation> allMutations = new ArrayList<>((Collection<RowMutation>) mutations);
-            if (tmutations != null)
-                allMutations.addAll(tmutations);
-            StorageProxy.mutateAtomically(allMutations, consistencyLevel);
-        }
+        Collection<RowMutation> augmented = TriggerExecutor.instance.execute(mutations);
+
+        if (augmented != null)
+            mutateAtomically(augmented, consistencyLevel);
+        else if (mutateAtomically)
+            mutateAtomically((Collection<RowMutation>) mutations, consistencyLevel);
         else
-        {
-            StorageProxy.mutate(mutations, consistencyLevel);
-        }
+            mutate(mutations, consistencyLevel);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index 8ccf937..988c6a7 100644
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@ -20,11 +20,9 @@ package org.apache.cassandra.triggers;
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -38,6 +36,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.HeapAllocator;
+import org.apache.cassandra.utils.Pair;
 
 public class TriggerExecutor
 {
@@ -68,7 +67,7 @@ public class TriggerExecutor
     public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException
     {
         List<RowMutation> intermediate = executeInternal(key, updates);
-        if (intermediate == null)
+        if (intermediate == null || intermediate.isEmpty())
             return updates;
 
         validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key, intermediate);
@@ -80,30 +79,62 @@ public class TriggerExecutor
         return updates;
     }
 
-    public Collection<RowMutation> execute(Collection<? extends IMutation> updates) throws InvalidRequestException
+    public Collection<RowMutation> execute(Collection<? extends IMutation> mutations) throws InvalidRequestException
     {
         boolean hasCounters = false;
-        Collection<RowMutation> tmutations = null;
-        for (IMutation mutation : updates)
+        List<RowMutation> augmentedMutations = null;
+
+        for (IMutation mutation : mutations)
         {
+            if (mutation instanceof CounterMutation)
+                hasCounters = true;
+
             for (ColumnFamily cf : mutation.getColumnFamilies())
             {
-                List<RowMutation> intermediate = executeInternal(mutation.key(), cf);
-                if (intermediate == null)
+                List<RowMutation> augmentations = executeInternal(mutation.key(), cf);
+                if (augmentations == null || augmentations.isEmpty())
                     continue;
 
-                validate(intermediate);
-                if (tmutations == null)
-                    tmutations = intermediate;
-                else
-                    tmutations.addAll(intermediate);
+                validate(augmentations);
+
+                if (augmentedMutations == null)
+                    augmentedMutations = new LinkedList<>();
+                augmentedMutations.addAll(augmentations);
             }
-            if (mutation instanceof CounterMutation)
-                hasCounters = true;
         }
-        if (tmutations != null && hasCounters)
+
+        if (augmentedMutations == null)
+            return null;
+
+        if (hasCounters)
             throw new InvalidRequestException("Counter mutations and trigger mutations cannot be applied together atomically.");
-        return tmutations;
+
+        @SuppressWarnings("unchecked")
+        Collection<RowMutation> originalMutations = (Collection<RowMutation>) mutations;
+
+        return mergeMutations(Iterables.concat(originalMutations, augmentedMutations));
+    }
+
+    private Collection<RowMutation> mergeMutations(Iterable<RowMutation> mutations)
+    {
+        Map<Pair<String, ByteBuffer>, RowMutation> groupedMutations = new HashMap<>();
+
+        for (RowMutation mutation : mutations)
+        {
+            Pair<String, ByteBuffer> key = Pair.create(mutation.getKeyspaceName(), mutation.key());
+            RowMutation current = groupedMutations.get(key);
+            if (current == null)
+            {
+                // copy in case the mutation's modifications map is backed by an immutable Collections#singletonMap().
+                groupedMutations.put(key, mutation.copy());
+            }
+            else
+            {
+                current.addAll(mutation);
+            }
+        }
+
+        return groupedMutations.values();
     }
 
     private void validateForSinglePartition(AbstractType<?> keyValidator,
@@ -141,7 +172,7 @@ public class TriggerExecutor
      */
     private List<RowMutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
     {
-        Map<String,TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
+        Map<String, TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
         if (triggers.isEmpty())
             return null;
         List<RowMutation> tmutations = Lists.newLinkedList();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
new file mode 100644
index 0000000..ab7f7c4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.triggers;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.TriggerDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static org.junit.Assert.*;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class TriggerExecutorTest
+{
+    @Test
+    public void sameKeySameCfColumnFamilies() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName()));
+        ColumnFamily mutated = TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+        assertEquals(bytes("v1"), mutated.getColumn(getColumnName(metadata, "c1")).value());
+        assertEquals(bytes("trigger"), mutated.getColumn(getColumnName(metadata, "c2")).value());
+    }
+
+    @Test(expected = InvalidRequestException.class)
+    public void sameKeyDifferentCfColumnFamilies() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName()));
+        TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+    }
+
+    @Test(expected = InvalidRequestException.class)
+    public void differentKeyColumnFamilies() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName()));
+        TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+    }
+
+    @Test
+    public void noTriggerMutations() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", NoOpTrigger.class.getName()));
+        RowMutation rm = new RowMutation(bytes("k1"), makeCf(metadata, "v1", null));
+        assertNull(TriggerExecutor.instance.execute(Collections.singletonList(rm)));
+    }
+
+    @Test
+    public void sameKeySameCfRowMutations() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName()));
+        ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+        ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+        RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+        RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+        List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+        assertEquals(2, tmutations.size());
+        Collections.sort(tmutations, new RmComparator());
+
+        List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+
+        mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+    }
+
+    @Test
+    public void sameKeySameCfPartialRowMutations() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfPartialTrigger.class.getName()));
+        ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+        ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+        RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+        RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+        List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+        assertEquals(2, tmutations.size());
+        Collections.sort(tmutations, new RmComparator());
+
+        List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+        mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+    }
+
+    @Test
+    public void sameKeyDifferentCfRowMutations() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName()));
+        ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+        ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+        RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+        RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+        List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+        assertEquals(2, tmutations.size());
+        Collections.sort(tmutations, new RmComparator());
+
+        List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+        assertEquals(2, mutatedCFs.size());
+
+        Collections.sort(mutatedCFs, new CfComparator());
+        assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+        assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1")));
+        assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value());
+
+        mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+        assertEquals(2, mutatedCFs.size());
+
+        Collections.sort(mutatedCFs, new CfComparator());
+        assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+        assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1")));
+        assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value());
+    }
+
+    @Test
+    public void sameKeyDifferentKsRowMutations() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentKsTrigger.class.getName()));
+        ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+        ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
+        RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
+        RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
+
+        List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+        assertEquals(4, tmutations.size());
+        Collections.sort(tmutations, new RmComparator());
+
+        List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+        mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+        mutatedCFs = new ArrayList<>(tmutations.get(2).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+        assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+
+        mutatedCFs = new ArrayList<>(tmutations.get(3).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+        assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+    }
+
+    @Test
+    public void differentKeyRowMutations() throws ConfigurationException, InvalidRequestException
+    {
+        CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName()));
+        ColumnFamily cf = makeCf(metadata, "v1", null);
+        RowMutation rm = new RowMutation(UTF8Type.instance.fromString("k1"), cf);
+
+        List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm)));
+        assertEquals(2, tmutations.size());
+        Collections.sort(tmutations, new RmComparator());
+
+        assertEquals(bytes("k1"), tmutations.get(0).key());
+        assertEquals(bytes("otherKey"), tmutations.get(1).key());
+
+        List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertEquals(bytes("v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+
+        mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+        assertEquals(1, mutatedCFs.size());
+        assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+        assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+    }
+
+    private static CFMetaData makeCfMetaData(String ks, String cf, TriggerDefinition trigger)
+    {
+
+        CFMetaData metadata = new CFMetaData(ks, cf, ColumnFamilyType.Standard, CompositeType.getInstance(UTF8Type.instance));
+
+        metadata.keyValidator(UTF8Type.instance);
+        metadata.addOrReplaceColumnDefinition(ColumnDefinition.partitionKeyDef(UTF8Type.instance.fromString("pkey"),
+                                                                               UTF8Type.instance,
+                                                                               null));
+        metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c1"),
+                                                                          UTF8Type.instance,
+                                                                          0));
+        metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c2"),
+                                                                          UTF8Type.instance,
+                                                                          0));
+        try
+        {
+            if (trigger != null)
+                metadata.addTriggerDefinition(trigger);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new AssertionError(e);
+        }
+
+        return metadata.rebuild();
+    }
+
+    private static ColumnFamily makeCf(CFMetaData metadata, String columnValue1, String columnValue2)
+    {
+        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+
+        if (columnValue1 != null)
+            cf.addColumn(new Column(getColumnName(metadata, "c1"), bytes(columnValue1)));
+
+        if (columnValue2 != null)
+            cf.addColumn(new Column(getColumnName(metadata, "c2"), bytes(columnValue2)));
+
+        return cf;
+    }
+
+    private static ByteBuffer getColumnName(CFMetaData metadata, String stringName)
+    {
+        return ((CompositeType) metadata.comparator).builder().add(bytes(stringName)).build();
+    }
+
+    public static class NoOpTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            return null;
+        }
+    }
+
+    public static class SameKeySameCfTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
+            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+            return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf));
+        }
+    }
+
+    public static class SameKeySameCfPartialTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            if (!key.equals(bytes("k2")))
+                return null;
+
+            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
+            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+            return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf));
+        }
+    }
+
+    public static class SameKeyDifferentCfTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData(update.metadata().ksName, "otherCf", null));
+            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+            return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf));
+        }
+    }
+
+    public static class SameKeyDifferentKsTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData("otherKs", "otherCf", null));
+            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+            return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf));
+        }
+    }
+
+    public static class DifferentKeyTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+        {
+            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
+            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
+            return Collections.singletonList(new RowMutation(cf.metadata().ksName, bytes("otherKey"), cf));
+        }
+    }
+
+    private static class RmComparator implements Comparator<IMutation>
+    {
+        public int compare(IMutation m1, IMutation m2)
+        {
+            int cmp = m1.getKeyspaceName().compareTo(m2.getKeyspaceName());
+            return cmp != 0 ? cmp : m1.key().compareTo(m2.key());
+        }
+    }
+
+    private static class CfComparator implements Comparator<ColumnFamily>
+    {
+        public int compare(ColumnFamily cf1, ColumnFamily cf2)
+        {
+            return cf1.metadata().cfName.compareTo(cf2.metadata().cfName);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/test/unit/org/apache/cassandra/triggers/TriggersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggersTest.java b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
index 5b9b27d..bda13ff 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@ -50,6 +50,8 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
 public class TriggersTest extends SchemaLoader
 {
     private static boolean triggerCreated = false;
@@ -126,7 +128,7 @@ public class TriggersTest extends SchemaLoader
                                             new TFramedTransportFactory().openTransport(
                                                 InetAddress.getLocalHost().getHostName(), 9170)));
         client.set_keyspace(ksName);
-        client.insert(ByteBufferUtil.bytes(2),
+        client.insert(bytes(2),
                       new ColumnParent(cfName),
                       getColumnForInsert("v1", 2),
                       org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -147,7 +149,7 @@ public class TriggersTest extends SchemaLoader
         cosc.setColumn(getColumnForInsert("v1", 3));
         mutation.setColumn_or_supercolumn(cosc);
         client.batch_mutate(
-            Collections.singletonMap(ByteBufferUtil.bytes(3),
+            Collections.singletonMap(bytes(3),
                                      Collections.singletonMap(cfName,
                                                               Collections.singletonList(mutation))),
             org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -183,9 +185,9 @@ public class TriggersTest extends SchemaLoader
                         new TFramedTransportFactory().openTransport(
                                 InetAddress.getLocalHost().getHostName(), 9170)));
         client.set_keyspace(ksName);
-        client.cas(ByteBufferUtil.bytes(6),
+        client.cas(bytes(6),
                    cfName,
-                   Collections.EMPTY_LIST,
+                   Collections.<org.apache.cassandra.thrift.Column>emptyList(),
                    Collections.singletonList(getColumnForInsert("v1", 6)),
                    org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
                    org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -241,9 +243,9 @@ public class TriggersTest extends SchemaLoader
                             new TFramedTransportFactory().openTransport(
                                     InetAddress.getLocalHost().getHostName(), 9170)));
             client.set_keyspace(ksName);
-            client.cas(ByteBufferUtil.bytes(9),
+            client.cas(bytes(9),
                        cf,
-                       Collections.EMPTY_LIST,
+                       Collections.<org.apache.cassandra.thrift.Column>emptyList(),
                        Collections.singletonList(getColumnForInsert("v1", 9)),
                        org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
                        org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -266,9 +268,9 @@ public class TriggersTest extends SchemaLoader
                             new TFramedTransportFactory().openTransport(
                                     InetAddress.getLocalHost().getHostName(), 9170)));
             client.set_keyspace(ksName);
-            client.cas(ByteBufferUtil.bytes(10),
+            client.cas(bytes(10),
                        cf,
-                       Collections.EMPTY_LIST,
+                       Collections.<org.apache.cassandra.thrift.Column>emptyList(),
                        Collections.singletonList(getColumnForInsert("v1", 10)),
                        org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL,
                        org.apache.cassandra.thrift.ConsistencyLevel.ONE);
@@ -310,7 +312,7 @@ public class TriggersTest extends SchemaLoader
     {
         org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
         column.setName(Schema.instance.getCFMetaData(ksName, cfName).comparator.fromString(columnName));
-        column.setValue(ByteBufferUtil.bytes(value));
+        column.setValue(bytes(value));
         column.setTimestamp(System.currentTimeMillis());
         return column;
     }
@@ -321,10 +323,8 @@ public class TriggersTest extends SchemaLoader
         {
             ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
             extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
-                                             ByteBufferUtil.bytes(999)));
-            RowMutation rm = new RowMutation(ksName, key);
-            rm.add(extraUpdate);
-            return Collections.singletonList(rm);
+                                             bytes(999)));
+            return Collections.singletonList(new RowMutation(ksName, key, extraUpdate));
         }
     }
 
@@ -334,12 +334,10 @@ public class TriggersTest extends SchemaLoader
         {
             ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
             extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
-                                             ByteBufferUtil.bytes(999)));
+                                             bytes(999)));
 
             int newKey = ByteBufferUtil.toInt(key) + 1000;
-            RowMutation rm = new RowMutation(ksName, ByteBufferUtil.bytes(newKey));
-            rm.add(extraUpdate);
-            return Collections.singletonList(rm);
+            return Collections.singletonList(new RowMutation(ksName, bytes(newKey), extraUpdate));
         }
     }
 
@@ -349,11 +347,8 @@ public class TriggersTest extends SchemaLoader
         {
             ColumnFamily extraUpdate = ArrayBackedSortedColumns.factory.create(ksName, otherCf);
             extraUpdate.addColumn(new Column(extraUpdate.metadata().comparator.fromString("v2"),
-                                             ByteBufferUtil.bytes(999)));
-
-            RowMutation rm = new RowMutation(ksName, key);
-            rm.add(extraUpdate);
-            return Collections.singletonList(rm);
+                                             bytes(999)));
+            return Collections.singletonList(new RowMutation(ksName, key, extraUpdate));
         }
     }
 }


[2/3] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/service/StorageProxy.java
	src/java/org/apache/cassandra/triggers/TriggerExecutor.java
	test/unit/org/apache/cassandra/triggers/TriggersTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f521227
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f521227
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f521227

Branch: refs/heads/trunk
Commit: 1f5212278378bda5623c48eaef39f4a54aef2e49
Parents: 4e95953 c1eb74c
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Apr 23 22:06:48 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Apr 23 22:06:48 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/db/Mutation.java  |   5 +
 .../apache/cassandra/service/StorageProxy.java  |  23 +-
 .../cassandra/triggers/TriggerExecutor.java     |  71 ++--
 .../cassandra/triggers/TriggerExecutorTest.java | 336 +++++++++++++++++++
 .../apache/cassandra/triggers/TriggersTest.java |  22 +-
 6 files changed, 410 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f521227/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d32a107,68c335d..295eb78
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -107,10 -64,6 +107,11 @@@ Merged from 2.0
     (CASSANDRA-6906)
   * Fix SSTable not released if stream session fails (CASSANDRA-6818)
   * Avoid build failure due to ANTLR timeout (CASSANDRA-6991)
 + * Queries on compact tables can return more rows that requested (CASSANDRA-7052)
 + * USING TIMESTAMP for batches does not work (CASSANDRA-7053)
 + * Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949)
 + * Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058)
++ * Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047)
  Merged from 1.2:
   * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
   * add extra SSL cipher suites (CASSANDRA-6613)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f521227/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Mutation.java
index 47c990f,0000000..6fae9b0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@@ -1,342 -1,0 +1,347 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.io.DataInput;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import org.apache.commons.lang3.StringUtils;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +// TODO convert this to a Builder pattern instead of encouraging M.add directly,
 +// which is less-efficient since we have to keep a mutable HashMap around
 +public class Mutation implements IMutation
 +{
 +    public static final MutationSerializer serializer = new MutationSerializer();
 +
 +    public static final String FORWARD_TO = "FWD_TO";
 +    public static final String FORWARD_FROM = "FWD_FRM";
 +
 +    // todo this is redundant
 +    // when we remove it, also restore SerializationsTest.testMutationRead to not regenerate new Mutations each test
 +    private final String keyspaceName;
 +
 +    private final ByteBuffer key;
 +    // map of column family id to mutations for that column family.
 +    private final Map<UUID, ColumnFamily> modifications;
 +
 +    public Mutation(String keyspaceName, ByteBuffer key)
 +    {
 +        this(keyspaceName, key, new HashMap<UUID, ColumnFamily>());
 +    }
 +
 +    public Mutation(String keyspaceName, ByteBuffer key, ColumnFamily cf)
 +    {
 +        this(keyspaceName, key, Collections.singletonMap(cf.id(), cf));
 +    }
 +
 +    public Mutation(String keyspaceName, Row row)
 +    {
 +        this(keyspaceName, row.key.key, row.cf);
 +    }
 +
 +    protected Mutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications)
 +    {
 +        this.keyspaceName = keyspaceName;
 +        this.key = key;
 +        this.modifications = modifications;
 +    }
 +
 +    public Mutation(ByteBuffer key, ColumnFamily cf)
 +    {
 +        this(cf.metadata().ksName, key, cf);
 +    }
 +
++    public Mutation copy()
++    {
++        return new Mutation(keyspaceName, key, new HashMap<>(modifications));
++    }
++
 +    public String getKeyspaceName()
 +    {
 +        return keyspaceName;
 +    }
 +
 +    public Collection<UUID> getColumnFamilyIds()
 +    {
 +        return modifications.keySet();
 +    }
 +
 +    public ByteBuffer key()
 +    {
 +        return key;
 +    }
 +
 +    public Collection<ColumnFamily> getColumnFamilies()
 +    {
 +        return modifications.values();
 +    }
 +
 +    public ColumnFamily getColumnFamily(UUID cfId)
 +    {
 +        return modifications.get(cfId);
 +    }
 +
 +    /*
 +     * Specify a column family name and the corresponding column
 +     * family object.
 +     * param @ cf - column family name
 +     * param @ columnFamily - the column family.
 +     */
 +    public void add(ColumnFamily columnFamily)
 +    {
 +        assert columnFamily != null;
 +        ColumnFamily prev = modifications.put(columnFamily.id(), columnFamily);
 +        if (prev != null)
 +            // developer error
 +            throw new IllegalArgumentException("ColumnFamily " + columnFamily + " already has modifications in this mutation: " + prev);
 +    }
 +
 +    /**
 +     * @return the ColumnFamily in this Mutation corresponding to @param cfName, creating an empty one if necessary.
 +     */
 +    public ColumnFamily addOrGet(String cfName)
 +    {
 +        return addOrGet(Schema.instance.getCFMetaData(keyspaceName, cfName));
 +    }
 +
 +    public ColumnFamily addOrGet(CFMetaData cfm)
 +    {
 +        ColumnFamily cf = modifications.get(cfm.cfId);
 +        if (cf == null)
 +        {
 +            cf = ArrayBackedSortedColumns.factory.create(cfm);
 +            modifications.put(cfm.cfId, cf);
 +        }
 +        return cf;
 +    }
 +
 +    public boolean isEmpty()
 +    {
 +        return modifications.isEmpty();
 +    }
 +
 +    public void add(String cfName, CellName name, ByteBuffer value, long timestamp, int timeToLive)
 +    {
 +        addOrGet(cfName).addColumn(name, value, timestamp, timeToLive);
 +    }
 +
 +    public void addCounter(String cfName, CellName name, long value)
 +    {
 +        addOrGet(cfName).addCounter(name, value);
 +    }
 +
 +    public void add(String cfName, CellName name, ByteBuffer value, long timestamp)
 +    {
 +        add(cfName, name, value, timestamp, 0);
 +    }
 +
 +    public void delete(String cfName, long timestamp)
 +    {
 +        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
 +        addOrGet(cfName).delete(new DeletionInfo(timestamp, localDeleteTime));
 +    }
 +
 +    public void delete(String cfName, CellName name, long timestamp)
 +    {
 +        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
 +        addOrGet(cfName).addTombstone(name, localDeleteTime, timestamp);
 +    }
 +
 +    public void deleteRange(String cfName, Composite start, Composite end, long timestamp)
 +    {
 +        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
 +        addOrGet(cfName).addAtom(new RangeTombstone(start, end, timestamp, localDeleteTime));
 +    }
 +
 +    public void addAll(IMutation m)
 +    {
 +        if (!(m instanceof Mutation))
 +            throw new IllegalArgumentException();
 +
 +        Mutation mutation = (Mutation)m;
 +        if (!keyspaceName.equals(mutation.keyspaceName) || !key.equals(mutation.key))
 +            throw new IllegalArgumentException();
 +
 +        for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet())
 +        {
 +            // It's slighty faster to assume the key wasn't present and fix if
 +            // not in the case where it wasn't there indeed.
 +            ColumnFamily cf = modifications.put(entry.getKey(), entry.getValue());
 +            if (cf != null)
 +                entry.getValue().addAll(cf);
 +        }
 +    }
 +
 +    /*
 +     * This is equivalent to calling commit. Applies the changes to
 +     * to the keyspace that is obtained by calling Keyspace.open().
 +     */
 +    public void apply()
 +    {
 +        Keyspace ks = Keyspace.open(keyspaceName);
 +        ks.apply(this, ks.metadata.durableWrites);
 +    }
 +
 +    public void applyUnsafe()
 +    {
 +        Keyspace.open(keyspaceName).apply(this, false);
 +    }
 +
 +    public MessageOut<Mutation> createMessage()
 +    {
 +        return createMessage(MessagingService.Verb.MUTATION);
 +    }
 +
 +    public MessageOut<Mutation> createMessage(MessagingService.Verb verb)
 +    {
 +        return new MessageOut<>(verb, this, serializer);
 +    }
 +
 +    public long getTimeout()
 +    {
 +        return DatabaseDescriptor.getWriteRpcTimeout();
 +    }
 +
 +    public String toString()
 +    {
 +        return toString(false);
 +    }
 +
 +    public String toString(boolean shallow)
 +    {
 +        StringBuilder buff = new StringBuilder("Mutation(");
 +        buff.append("keyspace='").append(keyspaceName).append('\'');
 +        buff.append(", key='").append(ByteBufferUtil.bytesToHex(key)).append('\'');
 +        buff.append(", modifications=[");
 +        if (shallow)
 +        {
 +            List<String> cfnames = new ArrayList<String>(modifications.size());
 +            for (UUID cfid : modifications.keySet())
 +            {
 +                CFMetaData cfm = Schema.instance.getCFMetaData(cfid);
 +                cfnames.add(cfm == null ? "-dropped-" : cfm.cfName);
 +            }
 +            buff.append(StringUtils.join(cfnames, ", "));
 +        }
 +        else
 +            buff.append(StringUtils.join(modifications.values(), ", "));
 +        return buff.append("])").toString();
 +    }
 +
 +    public Mutation without(UUID cfId)
 +    {
 +        Mutation mutation = new Mutation(keyspaceName, key);
 +        for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet())
 +            if (!entry.getKey().equals(cfId))
 +                mutation.add(entry.getValue());
 +        return mutation;
 +    }
 +
 +    public static class MutationSerializer implements IVersionedSerializer<Mutation>
 +    {
 +        public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException
 +        {
 +            if (version < MessagingService.VERSION_20)
 +                out.writeUTF(mutation.getKeyspaceName());
 +
 +            ByteBufferUtil.writeWithShortLength(mutation.key(), out);
 +
 +            /* serialize the modifications in the mutation */
 +            int size = mutation.modifications.size();
 +            out.writeInt(size);
 +            assert size > 0;
 +            for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet())
 +                ColumnFamily.serializer.serialize(entry.getValue(), out, version);
 +        }
 +
 +        public Mutation deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
 +        {
 +            String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that
 +            if (version < MessagingService.VERSION_20)
 +                keyspaceName = in.readUTF();
 +
 +            ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
 +            int size = in.readInt();
 +            assert size > 0;
 +
 +            Map<UUID, ColumnFamily> modifications;
 +            if (size == 1)
 +            {
 +                ColumnFamily cf = deserializeOneCf(in, version, flag);
 +                modifications = Collections.singletonMap(cf.id(), cf);
 +                keyspaceName = cf.metadata().ksName;
 +            }
 +            else
 +            {
 +                modifications = new HashMap<UUID, ColumnFamily>();
 +                for (int i = 0; i < size; ++i)
 +                {
 +                    ColumnFamily cf = deserializeOneCf(in, version, flag);
 +                    modifications.put(cf.id(), cf);
 +                    keyspaceName = cf.metadata().ksName;
 +                }
 +            }
 +
 +            return new Mutation(keyspaceName, key, modifications);
 +        }
 +
 +        private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
 +        {
 +            ColumnFamily cf = ColumnFamily.serializer.deserialize(in, ArrayBackedSortedColumns.factory, flag, version);
 +            // We don't allow Mutation with null column family, so we should never get null back.
 +            assert cf != null;
 +            return cf;
 +        }
 +
 +        public Mutation deserialize(DataInput in, int version) throws IOException
 +        {
 +            return deserialize(in, version, ColumnSerializer.Flag.FROM_REMOTE);
 +        }
 +
 +        public long serializedSize(Mutation mutation, int version)
 +        {
 +            TypeSizes sizes = TypeSizes.NATIVE;
 +            int size = 0;
 +
 +            if (version < MessagingService.VERSION_20)
 +                size += sizes.sizeof(mutation.getKeyspaceName());
 +
 +            int keySize = mutation.key().remaining();
 +            size += sizes.sizeof((short) keySize) + keySize;
 +
 +            size += sizes.sizeof(mutation.modifications.size());
 +            for (Map.Entry<UUID,ColumnFamily> entry : mutation.modifications.entrySet())
 +                size += ColumnFamily.serializer.serializedSize(entry.getValue(), TypeSizes.NATIVE, version);
 +
 +            return size;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f521227/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index d8c5813,14d5ee2..269d68f
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -522,21 -519,20 +522,20 @@@ public class StorageProxy implements St
          }
      }
  
-     public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically)
+     @SuppressWarnings("unchecked")
+     public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
+                                           ConsistencyLevel consistencyLevel,
+                                           boolean mutateAtomically)
      throws WriteTimeoutException, UnavailableException, OverloadedException, InvalidRequestException
      {
-         Collection<Mutation> tmutations = TriggerExecutor.instance.execute(mutations);
-         if (mutateAtomically || tmutations != null)
-         {
-             Collection<Mutation> allMutations = new ArrayList<>((Collection<Mutation>) mutations);
-             if (tmutations != null)
-                 allMutations.addAll(tmutations);
-             StorageProxy.mutateAtomically(allMutations, consistencyLevel);
-         }
 -        Collection<RowMutation> augmented = TriggerExecutor.instance.execute(mutations);
++        Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations);
+ 
+         if (augmented != null)
+             mutateAtomically(augmented, consistencyLevel);
+         else if (mutateAtomically)
 -            mutateAtomically((Collection<RowMutation>) mutations, consistencyLevel);
++            mutateAtomically((Collection<Mutation>) mutations, consistencyLevel);
          else
-         {
-             StorageProxy.mutate(mutations, consistencyLevel);
-         }
+             mutate(mutations, consistencyLevel);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f521227/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index 8a8c51d,988c6a7..4416a57
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@@ -35,6 -35,8 +33,7 @@@ import org.apache.cassandra.db.composit
  import org.apache.cassandra.db.marshal.AbstractType;
  import org.apache.cassandra.exceptions.InvalidRequestException;
  import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.HeapAllocator;
+ import org.apache.cassandra.utils.Pair;
  
  public class TriggerExecutor
  {
@@@ -64,8 -66,8 +63,8 @@@
  
      public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException
      {
 -        List<RowMutation> intermediate = executeInternal(key, updates);
 +        List<Mutation> intermediate = executeInternal(key, updates);
-         if (intermediate == null)
+         if (intermediate == null || intermediate.isEmpty())
              return updates;
  
          validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key, intermediate);
@@@ -80,30 -79,62 +79,62 @@@
          return updates;
      }
  
-     public Collection<Mutation> execute(Collection<? extends IMutation> updates) throws InvalidRequestException
 -    public Collection<RowMutation> execute(Collection<? extends IMutation> mutations) throws InvalidRequestException
++    public Collection<Mutation> execute(Collection<? extends IMutation> mutations) throws InvalidRequestException
      {
          boolean hasCounters = false;
-         Collection<Mutation> tmutations = null;
-         for (IMutation mutation : updates)
 -        List<RowMutation> augmentedMutations = null;
++        List<Mutation> augmentedMutations = null;
+ 
+         for (IMutation mutation : mutations)
          {
+             if (mutation instanceof CounterMutation)
+                 hasCounters = true;
+ 
              for (ColumnFamily cf : mutation.getColumnFamilies())
              {
-                 List<Mutation> intermediate = executeInternal(mutation.key(), cf);
-                 if (intermediate == null)
 -                List<RowMutation> augmentations = executeInternal(mutation.key(), cf);
++                List<Mutation> augmentations = executeInternal(mutation.key(), cf);
+                 if (augmentations == null || augmentations.isEmpty())
                      continue;
  
-                 validate(intermediate);
-                 if (tmutations == null)
-                     tmutations = intermediate;
-                 else
-                     tmutations.addAll(intermediate);
+                 validate(augmentations);
+ 
+                 if (augmentedMutations == null)
+                     augmentedMutations = new LinkedList<>();
+                 augmentedMutations.addAll(augmentations);
              }
-             if (mutation instanceof CounterMutation)
-                 hasCounters = true;
          }
-         if (tmutations != null && hasCounters)
+ 
+         if (augmentedMutations == null)
+             return null;
+ 
+         if (hasCounters)
              throw new InvalidRequestException("Counter mutations and trigger mutations cannot be applied together atomically.");
-         return tmutations;
+ 
+         @SuppressWarnings("unchecked")
 -        Collection<RowMutation> originalMutations = (Collection<RowMutation>) mutations;
++        Collection<Mutation> originalMutations = (Collection<Mutation>) mutations;
+ 
+         return mergeMutations(Iterables.concat(originalMutations, augmentedMutations));
+     }
+ 
 -    private Collection<RowMutation> mergeMutations(Iterable<RowMutation> mutations)
++    private Collection<Mutation> mergeMutations(Iterable<Mutation> mutations)
+     {
 -        Map<Pair<String, ByteBuffer>, RowMutation> groupedMutations = new HashMap<>();
++        Map<Pair<String, ByteBuffer>, Mutation> groupedMutations = new HashMap<>();
+ 
 -        for (RowMutation mutation : mutations)
++        for (Mutation mutation : mutations)
+         {
+             Pair<String, ByteBuffer> key = Pair.create(mutation.getKeyspaceName(), mutation.key());
 -            RowMutation current = groupedMutations.get(key);
++            Mutation current = groupedMutations.get(key);
+             if (current == null)
+             {
+                 // copy in case the mutation's modifications map is backed by an immutable Collections#singletonMap().
+                 groupedMutations.put(key, mutation.copy());
+             }
+             else
+             {
+                 current.addAll(mutation);
+             }
+         }
+ 
+         return groupedMutations.values();
      }
  
      private void validateForSinglePartition(AbstractType<?> keyValidator,
@@@ -141,12 -170,12 +172,12 @@@
       * Switch class loader before using the triggers for the column family, if
       * not loaded them with the custom class loader.
       */
 -    private List<RowMutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
 +    private List<Mutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
      {
-         Map<String,TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
+         Map<String, TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
          if (triggers.isEmpty())
              return null;
 -        List<RowMutation> tmutations = Lists.newLinkedList();
 +        List<Mutation> tmutations = Lists.newLinkedList();
          Thread.currentThread().setContextClassLoader(customClassLoader);
          try
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f521227/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
index 0000000,ab7f7c4..b0c45b9
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
@@@ -1,0 -1,332 +1,336 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.triggers;
+ 
+ import java.nio.ByteBuffer;
+ import java.util.*;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.TriggerDefinition;
+ import org.apache.cassandra.db.*;
++import org.apache.cassandra.db.composites.CellName;
+ import org.apache.cassandra.db.marshal.CompositeType;
+ import org.apache.cassandra.db.marshal.UTF8Type;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
+ 
+ import static org.junit.Assert.*;
+ 
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ 
+ public class TriggerExecutorTest
+ {
+     @Test
+     public void sameKeySameCfColumnFamilies() throws ConfigurationException, InvalidRequestException
+     {
+         CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName()));
+         ColumnFamily mutated = TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+         assertEquals(bytes("v1"), mutated.getColumn(getColumnName(metadata, "c1")).value());
+         assertEquals(bytes("trigger"), mutated.getColumn(getColumnName(metadata, "c2")).value());
+     }
+ 
+     @Test(expected = InvalidRequestException.class)
+     public void sameKeyDifferentCfColumnFamilies() throws ConfigurationException, InvalidRequestException
+     {
+         CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName()));
+         TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+     }
+ 
+     @Test(expected = InvalidRequestException.class)
+     public void differentKeyColumnFamilies() throws ConfigurationException, InvalidRequestException
+     {
+         CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName()));
+         TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null));
+     }
+ 
+     @Test
+     public void noTriggerMutations() throws ConfigurationException, InvalidRequestException
+     {
+         CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", NoOpTrigger.class.getName()));
 -        RowMutation rm = new RowMutation(bytes("k1"), makeCf(metadata, "v1", null));
++        Mutation rm = new Mutation(bytes("k1"), makeCf(metadata, "v1", null));
+         assertNull(TriggerExecutor.instance.execute(Collections.singletonList(rm)));
+     }
+ 
+     @Test
+     public void sameKeySameCfRowMutations() throws ConfigurationException, InvalidRequestException
+     {
+         CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName()));
+         ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+         ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
 -        RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
 -        RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
++        Mutation rm1 = new Mutation(bytes("k1"), cf1);
++        Mutation rm2 = new Mutation(bytes("k2"), cf2);
+ 
+         List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+         assertEquals(2, tmutations.size());
+         Collections.sort(tmutations, new RmComparator());
+ 
+         List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+         assertEquals(1, mutatedCFs.size());
+         assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+         assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ 
+         mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+         assertEquals(1, mutatedCFs.size());
+         assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+         assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+     }
+ 
+     @Test
+     public void sameKeySameCfPartialRowMutations() throws ConfigurationException, InvalidRequestException
+     {
+         CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfPartialTrigger.class.getName()));
+         ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+         ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
 -        RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
 -        RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
++        Mutation rm1 = new Mutation(bytes("k1"), cf1);
++        Mutation rm2 = new Mutation(bytes("k2"), cf2);
+ 
+         List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+         assertEquals(2, tmutations.size());
+         Collections.sort(tmutations, new RmComparator());
+ 
+         List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+         assertEquals(1, mutatedCFs.size());
+         assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+         assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+ 
+         mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+         assertEquals(1, mutatedCFs.size());
+         assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+         assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+     }
+ 
+     @Test
+     public void sameKeyDifferentCfRowMutations() throws ConfigurationException, InvalidRequestException
+     {
+         CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName()));
+         ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+         ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
 -        RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
 -        RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
++        Mutation rm1 = new Mutation(bytes("k1"), cf1);
++        Mutation rm2 = new Mutation(bytes("k2"), cf2);
+ 
+         List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+         assertEquals(2, tmutations.size());
+         Collections.sort(tmutations, new RmComparator());
+ 
+         List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+         assertEquals(2, mutatedCFs.size());
+ 
+         Collections.sort(mutatedCFs, new CfComparator());
+         assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+         assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+         assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1")));
+         assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value());
+ 
+         mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+         assertEquals(2, mutatedCFs.size());
+ 
+         Collections.sort(mutatedCFs, new CfComparator());
+         assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+         assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+         assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1")));
+         assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value());
+     }
+ 
+     @Test
+     public void sameKeyDifferentKsRowMutations() throws ConfigurationException, InvalidRequestException
+     {
+         CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentKsTrigger.class.getName()));
+         ColumnFamily cf1 = makeCf(metadata, "k1v1", null);
+         ColumnFamily cf2 = makeCf(metadata, "k2v1", null);
 -        RowMutation rm1 = new RowMutation(bytes("k1"), cf1);
 -        RowMutation rm2 = new RowMutation(bytes("k2"), cf2);
++        Mutation rm1 = new Mutation(bytes("k1"), cf1);
++        Mutation rm2 = new Mutation(bytes("k2"), cf2);
+ 
+         List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2)));
+         assertEquals(4, tmutations.size());
+         Collections.sort(tmutations, new RmComparator());
+ 
+         List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+         assertEquals(1, mutatedCFs.size());
+         assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+         assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+ 
+         mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+         assertEquals(1, mutatedCFs.size());
+         assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+         assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+ 
+         mutatedCFs = new ArrayList<>(tmutations.get(2).getColumnFamilies());
+         assertEquals(1, mutatedCFs.size());
+         assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+         assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+ 
+         mutatedCFs = new ArrayList<>(tmutations.get(3).getColumnFamilies());
+         assertEquals(1, mutatedCFs.size());
+         assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+         assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+     }
+ 
+     @Test
+     public void differentKeyRowMutations() throws ConfigurationException, InvalidRequestException
+     {
+         CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName()));
+         ColumnFamily cf = makeCf(metadata, "v1", null);
 -        RowMutation rm = new RowMutation(UTF8Type.instance.fromString("k1"), cf);
++        Mutation rm = new Mutation(UTF8Type.instance.fromString("k1"), cf);
+ 
+         List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm)));
+         assertEquals(2, tmutations.size());
+         Collections.sort(tmutations, new RmComparator());
+ 
+         assertEquals(bytes("k1"), tmutations.get(0).key());
+         assertEquals(bytes("otherKey"), tmutations.get(1).key());
+ 
+         List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies());
+         assertEquals(1, mutatedCFs.size());
+         assertEquals(bytes("v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value());
+         assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")));
+ 
+         mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies());
+         assertEquals(1, mutatedCFs.size());
+         assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")));
+         assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value());
+     }
+ 
+     private static CFMetaData makeCfMetaData(String ks, String cf, TriggerDefinition trigger)
+     {
+ 
 -        CFMetaData metadata = new CFMetaData(ks, cf, ColumnFamilyType.Standard, CompositeType.getInstance(UTF8Type.instance));
++        CFMetaData metadata = CFMetaData.sparseCFMetaData(ks, cf, CompositeType.getInstance(UTF8Type.instance));
+ 
+         metadata.keyValidator(UTF8Type.instance);
 -        metadata.addOrReplaceColumnDefinition(ColumnDefinition.partitionKeyDef(UTF8Type.instance.fromString("pkey"),
++        metadata.addOrReplaceColumnDefinition(ColumnDefinition.partitionKeyDef(metadata,
++                                                                               UTF8Type.instance.fromString("pkey"),
+                                                                                UTF8Type.instance,
+                                                                                null));
 -        metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c1"),
++        metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(metadata,
++                                                                          UTF8Type.instance.fromString("c1"),
+                                                                           UTF8Type.instance,
+                                                                           0));
 -        metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c2"),
++        metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(metadata,
++                                                                          UTF8Type.instance.fromString("c2"),
+                                                                           UTF8Type.instance,
+                                                                           0));
+         try
+         {
+             if (trigger != null)
+                 metadata.addTriggerDefinition(trigger);
+         }
+         catch (ConfigurationException e)
+         {
+             throw new AssertionError(e);
+         }
+ 
+         return metadata.rebuild();
+     }
+ 
+     private static ColumnFamily makeCf(CFMetaData metadata, String columnValue1, String columnValue2)
+     {
+         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+ 
+         if (columnValue1 != null)
 -            cf.addColumn(new Column(getColumnName(metadata, "c1"), bytes(columnValue1)));
++            cf.addColumn(new Cell(getColumnName(metadata, "c1"), bytes(columnValue1)));
+ 
+         if (columnValue2 != null)
 -            cf.addColumn(new Column(getColumnName(metadata, "c2"), bytes(columnValue2)));
++            cf.addColumn(new Cell(getColumnName(metadata, "c2"), bytes(columnValue2)));
+ 
+         return cf;
+     }
+ 
 -    private static ByteBuffer getColumnName(CFMetaData metadata, String stringName)
++    private static CellName getColumnName(CFMetaData metadata, String stringName)
+     {
 -        return ((CompositeType) metadata.comparator).builder().add(bytes(stringName)).build();
++        return metadata.comparator.makeCellName(stringName);
+     }
+ 
+     public static class NoOpTrigger implements ITrigger
+     {
 -        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++        public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+         {
+             return null;
+         }
+     }
+ 
+     public static class SameKeySameCfTrigger implements ITrigger
+     {
 -        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++        public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+         {
+             ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
 -            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
 -            return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf));
++            cf.addColumn(new Cell(getColumnName(update.metadata(), "c2"), bytes("trigger")));
++            return Collections.singletonList(new Mutation(update.metadata().ksName, key, cf));
+         }
+     }
+ 
+     public static class SameKeySameCfPartialTrigger implements ITrigger
+     {
 -        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++        public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+         {
+             if (!key.equals(bytes("k2")))
+                 return null;
+ 
+             ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
 -            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
 -            return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf));
++            cf.addColumn(new Cell(getColumnName(update.metadata(), "c2"), bytes("trigger")));
++            return Collections.singletonList(new Mutation(update.metadata().ksName, key, cf));
+         }
+     }
+ 
+     public static class SameKeyDifferentCfTrigger implements ITrigger
+     {
 -        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++        public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+         {
+             ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData(update.metadata().ksName, "otherCf", null));
 -            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
 -            return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf));
++            cf.addColumn(new Cell(getColumnName(update.metadata(), "c2"), bytes("trigger")));
++            return Collections.singletonList(new Mutation(cf.metadata().ksName, key, cf));
+         }
+     }
+ 
+     public static class SameKeyDifferentKsTrigger implements ITrigger
+     {
 -        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++        public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+         {
+             ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData("otherKs", "otherCf", null));
 -            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
 -            return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf));
++            cf.addColumn(new Cell(getColumnName(update.metadata(), "c2"), bytes("trigger")));
++            return Collections.singletonList(new Mutation(cf.metadata().ksName, key, cf));
+         }
+     }
+ 
+     public static class DifferentKeyTrigger implements ITrigger
+     {
 -        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
++        public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
+         {
+             ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata());
 -            cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger")));
 -            return Collections.singletonList(new RowMutation(cf.metadata().ksName, bytes("otherKey"), cf));
++            cf.addColumn(new Cell(getColumnName(update.metadata(), "c2"), bytes("trigger")));
++            return Collections.singletonList(new Mutation(cf.metadata().ksName, bytes("otherKey"), cf));
+         }
+     }
+ 
+     private static class RmComparator implements Comparator<IMutation>
+     {
+         public int compare(IMutation m1, IMutation m2)
+         {
+             int cmp = m1.getKeyspaceName().compareTo(m2.getKeyspaceName());
+             return cmp != 0 ? cmp : m1.key().compareTo(m2.key());
+         }
+     }
+ 
+     private static class CfComparator implements Comparator<ColumnFamily>
+     {
+         public int compare(ColumnFamily cf1, ColumnFamily cf2)
+         {
+             return cf1.metadata().cfName.compareTo(cf2.metadata().cfName);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f521227/test/unit/org/apache/cassandra/triggers/TriggersTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/triggers/TriggersTest.java
index 79133e2,bda13ff..28464e5
--- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@@ -313,43 -319,36 +313,33 @@@ public class TriggersTest extends Schem
  
      public static class TestTrigger implements ITrigger
      {
 -        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
 +        public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
          {
              ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
-             extraUpdate.addColumn(new Cell(update.metadata().comparator.makeCellName(bytes("v2")),
-                                            bytes(999)));
-             Mutation mutation = new Mutation(ksName, key);
-             mutation.add(extraUpdate);
-             return Collections.singletonList(mutation);
 -            extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
 -                                             bytes(999)));
 -            return Collections.singletonList(new RowMutation(ksName, key, extraUpdate));
++            extraUpdate.addColumn(new Cell(update.metadata().comparator.makeCellName(bytes("v2")), bytes(999)));
++            return Collections.singletonList(new Mutation(ksName, key, extraUpdate));
          }
      }
  
      public static class CrossPartitionTrigger implements ITrigger
      {
 -        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
 +        public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
          {
              ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
-             extraUpdate.addColumn(new Cell(update.metadata().comparator.makeCellName(bytes("v2")),
-                                            bytes(999)));
 -            extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"),
 -                                             bytes(999)));
++            extraUpdate.addColumn(new Cell(update.metadata().comparator.makeCellName(bytes("v2")), bytes(999)));
  
 -            int newKey = ByteBufferUtil.toInt(key) + 1000;
 -            return Collections.singletonList(new RowMutation(ksName, bytes(newKey), extraUpdate));
 +            int newKey = toInt(key) + 1000;
-             Mutation mutation = new Mutation(ksName, bytes(newKey));
-             mutation.add(extraUpdate);
-             return Collections.singletonList(mutation);
++            return Collections.singletonList(new Mutation(ksName, bytes(newKey), extraUpdate));
          }
      }
  
      public static class CrossTableTrigger implements ITrigger
      {
 -        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
 +        public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
          {
              ColumnFamily extraUpdate = ArrayBackedSortedColumns.factory.create(ksName, otherCf);
-             extraUpdate.addColumn(new Cell(extraUpdate.metadata().comparator.makeCellName(bytes("v2")),
-                                            bytes(999)));
- 
-             Mutation mutation = new Mutation(ksName, key);
-             mutation.add(extraUpdate);
-             return Collections.singletonList(mutation);
 -            extraUpdate.addColumn(new Column(extraUpdate.metadata().comparator.fromString("v2"),
 -                                             bytes(999)));
 -            return Collections.singletonList(new RowMutation(ksName, key, extraUpdate));
++            extraUpdate.addColumn(new Cell(extraUpdate.metadata().comparator.makeCellName(bytes("v2")), bytes(999)));
++            return Collections.singletonList(new Mutation(ksName, key, extraUpdate));
          }
      }
  }


[3/3] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec7206ce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec7206ce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec7206ce

Branch: refs/heads/trunk
Commit: ec7206ce6a50d2ca03ac23f8ba6ec8eea9cfe6d8
Parents: f22e775 1f52122
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Apr 23 22:07:10 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Apr 23 22:07:10 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/db/Mutation.java  |   5 +
 .../apache/cassandra/service/StorageProxy.java  |  23 +-
 .../cassandra/triggers/TriggerExecutor.java     |  71 ++--
 .../cassandra/triggers/TriggerExecutorTest.java | 336 +++++++++++++++++++
 .../apache/cassandra/triggers/TriggersTest.java |  22 +-
 6 files changed, 410 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7206ce/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec7206ce/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------