You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/07/29 22:16:16 UTC

[3/5] cassandra git commit: Materialized Views

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java b/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
new file mode 100644
index 0000000..4dfea75
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.utils.FBUtilities;
+
+public final class MaterializedViewUtils
+{
+    private MaterializedViewUtils()
+    {
+    }
+
+    /**
+     * Calculate the natural endpoint for the view.
+     *
+     * The view natural endpoint is the endpint which has the same cardinality as this node in the replication factor.
+     * The cardinality is the number at which this node would store a piece of data, given the change in replication
+     * factor.
+     *
+     * For example, if we have the following ring:
+     *   A, T1 -> B, T2 -> C, T3 -> A
+     *
+     * For the token T1, at RF=1, A would be included, so A's cardinality for T1 is 1. For the token T1, at RF=2, B would
+     * be included, so B's cardinality for token T1 is 2. For token T3, at RF = 2, A would be included, so A's cardinality
+     * for T3 is 2.
+     *
+     * For a view whose base token is T1 and whose view token is T3, the pairings between the nodes would be:
+     *  A writes to C (A's cardinality is 1 for T1, and C's cardinality is 1 for T3)
+     *  B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 for T3)
+     *  C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 for T3)
+     *
+     * @throws RuntimeException if this method is called using a base token which does not belong to this replica
+     */
+    public static InetAddress getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
+    {
+        AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
+
+        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+        List<InetAddress> localBaseEndpoints = new ArrayList<>();
+        List<InetAddress> localViewEndpoints = new ArrayList<>();
+        for (InetAddress baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken))
+        {
+            if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter))
+                localBaseEndpoints.add(baseEndpoint);
+        }
+
+        for (InetAddress viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken))
+        {
+            // If we are a base endpoint which is also a view replica, we use ourselves as our view replica
+            if (viewEndpoint.equals(FBUtilities.getBroadcastAddress()))
+                return viewEndpoint;
+
+            // We have to remove any endpoint which is shared between the base and the view, as it will select itself
+            // and throw off the counts otherwise.
+            if (localBaseEndpoints.contains(viewEndpoint))
+                localBaseEndpoints.remove(viewEndpoint);
+            else if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter))
+                localViewEndpoints.add(viewEndpoint);
+        }
+
+        // The replication strategy will be the same for the base and the view, as they must belong to the same keyspace.
+        // Since the same replication strategy is used, the same placement should be used and we should get the same
+        // number of replicas for all of the tokens in the ring.
+        assert localBaseEndpoints.size() == localViewEndpoints.size() : "Replication strategy should have the same number of endpoints for the base and the view";
+        int baseIdx = localBaseEndpoints.indexOf(FBUtilities.getBroadcastAddress());
+        if (baseIdx < 0)
+            throw new RuntimeException("Trying to get the view natural endpoint on a non-data replica");
+
+        return localViewEndpoints.get(baseIdx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
new file mode 100644
index 0000000..53e4e91
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.CBuilder;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Conflicts;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Represents a single CQL Row in a base table, with both the currently persisted value and the update's value. The
+ * values are stored in timestamp order, but also indicate whether they are from the currently persisted, allowing a
+ * {@link TemporalRow.Resolver} to resolve if the value is an old value that has been updated; if it sorts after the
+ * update's value, then it does not qualify.
+ */
+public class TemporalRow
+{
+    private static final int NO_TTL = LivenessInfo.NO_TTL;
+    private static final long NO_TIMESTAMP = LivenessInfo.NO_TIMESTAMP;
+    private static final int NO_DELETION_TIME = DeletionTime.LIVE.localDeletionTime();
+
+    public interface Resolver
+    {
+        /**
+         * @param cells Iterable of all cells for a certain TemporalRow's Cell, in timestamp sorted order
+         * @return      A single TemporalCell from the iterable which satisfies the resolution criteria, or null if
+         *              there is no cell which qualifies
+         */
+        TemporalCell resolve(Iterable<TemporalCell> cells);
+    }
+
+    /**
+     * Returns the first value in the iterable if it is from the set of persisted cells, and the cell which results from
+     * reconciliation of the remaining cells does not have the same value.
+     */
+    public static final Resolver oldValueIfUpdated = cells -> {
+        Iterator<TemporalCell> iterator = cells.iterator();
+        if (!iterator.hasNext())
+            return null;
+
+        TemporalCell initial = iterator.next();
+        if (initial.isNew || !iterator.hasNext())
+            return null;
+
+        TemporalCell value = initial;
+        while (iterator.hasNext())
+            value = value.reconcile(iterator.next());
+
+        return ByteBufferUtil.compareUnsigned(initial.value, value.value) != 0 ? initial : null;
+    };
+
+    public static final Resolver earliest = cells -> {
+        Iterator<TemporalCell> iterator = cells.iterator();
+        if (!iterator.hasNext())
+            return null;
+        return iterator.next();
+    };
+
+    public static final Resolver latest = cells -> {
+        Iterator<TemporalCell> iterator = cells.iterator();
+        if (!iterator.hasNext())
+            return null;
+
+        TemporalCell value = iterator.next();
+        while (iterator.hasNext())
+            value = value.reconcile(iterator.next());
+
+        return value;
+    };
+
+    private static class TemporalCell
+    {
+        public final ByteBuffer value;
+        public final long timestamp;
+        public final int ttl;
+        public final int localDeletionTime;
+        public final boolean isNew;
+
+        private TemporalCell(ByteBuffer value, long timestamp, int ttl, int localDeletionTime, boolean isNew)
+        {
+            this.value = value;
+            this.timestamp = timestamp;
+            this.ttl = ttl;
+            this.localDeletionTime = localDeletionTime;
+            this.isNew = isNew;
+        }
+
+        public TemporalCell reconcile(TemporalCell that)
+        {
+            int now = FBUtilities.nowInSeconds();
+            Conflicts.Resolution resolution = Conflicts.resolveRegular(that.timestamp,
+                                                                       that.isLive(now),
+                                                                       that.localDeletionTime,
+                                                                       that.value,
+                                                                       this.timestamp,
+                                                                       this.isLive(now),
+                                                                       this.localDeletionTime,
+                                                                       this.value);
+            assert resolution != Conflicts.Resolution.MERGE;
+            if (resolution == Conflicts.Resolution.LEFT_WINS)
+                return that;
+            return this;
+        }
+
+        private boolean isLive(int now)
+        {
+            return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && now < localDeletionTime);
+        }
+
+        public Cell cell(ColumnDefinition definition, CellPath cellPath)
+        {
+            return new BufferCell(definition, timestamp, ttl, localDeletionTime, value, cellPath);
+        }
+    }
+
+    private final ColumnFamilyStore baseCfs;
+    private final java.util.Set<ColumnIdentifier> viewPrimaryKey;
+    private final ByteBuffer basePartitionKey;
+    public final Map<ColumnIdentifier, ByteBuffer> clusteringColumns;
+    public final int nowInSec;
+    private final Map<ColumnIdentifier, Map<CellPath, SortedMap<Long, TemporalCell>>> columnValues = new HashMap<>();
+    private int viewClusteringTtl = NO_TTL;
+    private long viewClusteringTimestamp = NO_TIMESTAMP;
+    private int viewClusteringLocalDeletionTime = NO_DELETION_TIME;
+
+    TemporalRow(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey, ByteBuffer key, Row row, int nowInSec, boolean isNew)
+    {
+        this.baseCfs = baseCfs;
+        this.viewPrimaryKey = viewPrimaryKey;
+        this.basePartitionKey = key;
+        this.nowInSec = nowInSec;
+        clusteringColumns = new HashMap<>();
+        LivenessInfo liveness = row.primaryKeyLivenessInfo();
+        this.viewClusteringLocalDeletionTime = minValueIfSet(viewClusteringLocalDeletionTime, row.deletion().localDeletionTime(), NO_DELETION_TIME);
+        this.viewClusteringTimestamp = minValueIfSet(viewClusteringTimestamp, liveness.timestamp(), NO_TIMESTAMP);
+        this.viewClusteringTtl = minValueIfSet(viewClusteringTtl, liveness.ttl(), NO_TTL);
+
+        List<ColumnDefinition> clusteringDefs = baseCfs.metadata.clusteringColumns();
+        for (int i = 0; i < clusteringDefs.size(); i++)
+        {
+            ColumnDefinition cdef = clusteringDefs.get(i);
+            clusteringColumns.put(cdef.name, row.clustering().get(i));
+
+            addColumnValue(cdef.name, null, NO_TIMESTAMP, NO_TTL, NO_DELETION_TIME, row.clustering().get(i), isNew);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TemporalRow that = (TemporalRow) o;
+
+        if (!clusteringColumns.equals(that.clusteringColumns)) return false;
+        if (!basePartitionKey.equals(that.basePartitionKey)) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = basePartitionKey.hashCode();
+        result = 31 * result + clusteringColumns.hashCode();
+        return result;
+    }
+
+    public void addColumnValue(ColumnIdentifier identifier,
+                               CellPath cellPath,
+                               long timestamp,
+                               int ttl,
+                               int localDeletionTime,
+                               ByteBuffer value,  boolean isNew)
+    {
+        if (!columnValues.containsKey(identifier))
+            columnValues.put(identifier, new HashMap<>());
+
+        Map<CellPath, SortedMap<Long, TemporalCell>> innerMap = columnValues.get(identifier);
+
+        if (!innerMap.containsKey(cellPath))
+            innerMap.put(cellPath, new TreeMap<>());
+
+        // If this column is part of the view's primary keys
+        if (viewPrimaryKey.contains(identifier))
+        {
+            this.viewClusteringTtl = minValueIfSet(this.viewClusteringTtl, ttl, NO_TTL);
+            this.viewClusteringTimestamp = minValueIfSet(this.viewClusteringTimestamp, timestamp, NO_TIMESTAMP);
+            this.viewClusteringLocalDeletionTime = minValueIfSet(this.viewClusteringLocalDeletionTime, localDeletionTime, NO_DELETION_TIME);
+        }
+
+        innerMap.get(cellPath).put(timestamp, new TemporalCell(value, timestamp, ttl, localDeletionTime, isNew));
+    }
+
+    private static int minValueIfSet(int existing, int update, int defaultValue)
+    {
+        if (existing == defaultValue)
+            return update;
+        if (update == defaultValue)
+            return existing;
+        return Math.min(existing, update);
+    }
+
+    private static long minValueIfSet(long existing, long update, long defaultValue)
+    {
+        if (existing == defaultValue)
+            return update;
+        if (update == defaultValue)
+            return existing;
+        return Math.min(existing, update);
+    }
+
+    public int viewClusteringTtl()
+    {
+        return viewClusteringTtl;
+    }
+
+    public long viewClusteringTimestamp()
+    {
+        return viewClusteringTimestamp;
+    }
+
+    public int viewClusteringLocalDeletionTime()
+    {
+        return viewClusteringLocalDeletionTime;
+    }
+
+    public void addCell(Cell cell, boolean isNew)
+    {
+        addColumnValue(cell.column().name, cell.path(), cell.timestamp(), cell.ttl(), cell.localDeletionTime(), cell.value(), isNew);
+    }
+
+    // The Definition here is actually the *base table* definition
+    public ByteBuffer clusteringValue(ColumnDefinition definition, Resolver resolver)
+    {
+        ColumnDefinition baseDefinition = definition.cfName.equals(baseCfs.name)
+                                          ? definition
+                                          : baseCfs.metadata.getColumnDefinition(definition.name);
+
+        if (baseDefinition.isPartitionKey())
+        {
+            if (baseDefinition.isOnAllComponents())
+                return basePartitionKey;
+            else
+            {
+                CompositeType keyComparator = (CompositeType) baseCfs.metadata.getKeyValidator();
+                ByteBuffer[] components = keyComparator.split(basePartitionKey);
+                return components[baseDefinition.position()];
+            }
+        }
+        else
+        {
+            ColumnIdentifier columnIdentifier = baseDefinition.name;
+
+            if (clusteringColumns.containsKey(columnIdentifier))
+                return clusteringColumns.get(columnIdentifier);
+
+            Collection<org.apache.cassandra.db.rows.Cell> val = values(definition, resolver);
+            if (val != null && val.size() == 1)
+                return Iterables.getOnlyElement(val).value();
+        }
+        return null;
+    }
+
+    public DeletionTime deletionTime(AbstractThreadUnsafePartition partition)
+    {
+        DeletionInfo deletionInfo = partition.deletionInfo();
+        if (!deletionInfo.getPartitionDeletion().isLive())
+            return deletionInfo.getPartitionDeletion();
+
+        Clustering baseClustering = baseClusteringBuilder().build();
+        RangeTombstone clusterTombstone = deletionInfo.rangeCovering(baseClustering);
+        if (clusterTombstone != null)
+            return clusterTombstone.deletionTime();
+
+        Row row = partition.getRow(baseClustering);
+        return row == null || row.deletion().isLive() ? DeletionTime.LIVE : row.deletion();
+    }
+
+    public Collection<org.apache.cassandra.db.rows.Cell> values(ColumnDefinition definition, Resolver resolver)
+    {
+        Map<CellPath, SortedMap<Long, TemporalCell>> innerMap = columnValues.get(definition.name);
+        if (innerMap == null)
+        {
+            return Collections.emptyList();
+        }
+
+        Collection<org.apache.cassandra.db.rows.Cell> value = new ArrayList<>();
+        for (Map.Entry<CellPath, SortedMap<Long, TemporalCell>> pathAndCells : innerMap.entrySet())
+        {
+            TemporalCell cell = resolver.resolve(pathAndCells.getValue().values());
+
+            if (cell != null)
+                value.add(cell.cell(definition, pathAndCells.getKey()));
+        }
+        return value;
+    }
+
+    public Slice baseSlice()
+    {
+        return baseClusteringBuilder().buildSlice();
+    }
+
+    private CBuilder baseClusteringBuilder()
+    {
+        CFMetaData metadata = baseCfs.metadata;
+        CBuilder builder = CBuilder.create(metadata.comparator);
+
+        ByteBuffer[] buffers = new ByteBuffer[clusteringColumns.size()];
+        for (Map.Entry<ColumnIdentifier, ByteBuffer> buffer : clusteringColumns.entrySet())
+            buffers[metadata.getColumnDefinition(buffer.getKey()).position()] = buffer.getValue();
+
+        for (ByteBuffer byteBuffer : buffers)
+            builder = builder.add(byteBuffer);
+
+        return builder;
+    }
+
+    static class Set implements Iterable<TemporalRow>
+    {
+        private final ColumnFamilyStore baseCfs;
+        private final java.util.Set<ColumnIdentifier> viewPrimaryKey;
+        private final ByteBuffer key;
+        public final DecoratedKey dk;
+        private final Map<Clustering, TemporalRow> clusteringToRow;
+        final int nowInSec = FBUtilities.nowInSeconds();
+
+        Set(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey, ByteBuffer key)
+        {
+            this.baseCfs = baseCfs;
+            this.viewPrimaryKey = viewPrimaryKey;
+            this.key = key;
+            this.dk = baseCfs.partitioner.decorateKey(key);
+            this.clusteringToRow = new HashMap<>();
+        }
+
+        public Iterator<TemporalRow> iterator()
+        {
+            return clusteringToRow.values().iterator();
+        }
+
+        public TemporalRow getClustering(Clustering clustering)
+        {
+            return clusteringToRow.get(clustering);
+        }
+
+        public void addRow(Row row, boolean isNew)
+        {
+            TemporalRow temporalRow = clusteringToRow.get(row.clustering());
+            if (temporalRow == null)
+            {
+                temporalRow = new TemporalRow(baseCfs, viewPrimaryKey, key, row, nowInSec, isNew);
+                clusteringToRow.put(row.clustering(), temporalRow);
+            }
+
+            for (Cell cell: row.cells())
+            {
+                temporalRow.addCell(cell, isNew);
+            }
+        }
+
+        public int size()
+        {
+            return clusteringToRow.size();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 8f52bb1..9c886b0 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -87,7 +87,7 @@ public class KeyspaceMetrics
     public final LatencyMetrics casPropose;
     /** CAS Commit metrics */
     public final LatencyMetrics casCommit;
-    
+
     public final MetricNameFactory factory;
     private Keyspace keyspace;
     

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java b/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
new file mode 100644
index 0000000..39a5574
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Counter;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class MVWriteMetrics extends ClientRequestMetrics
+{
+    public final Counter viewReplicasAttempted;
+    public final Counter viewReplicasSuccess;
+
+    public MVWriteMetrics(String scope) {
+        super(scope);
+        viewReplicasAttempted = Metrics.counter(factory.createMetricName("ViewReplicasAttempted"));
+        viewReplicasSuccess = Metrics.counter(factory.createMetricName("ViewReplicasSuccess"));
+    }
+
+    public void release()
+    {
+        super.release();
+        Metrics.remove(factory.createMetricName("ViewReplicasAttempted"));
+        Metrics.remove(factory.createMetricName("ViewReplicasSuccess"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index a5f3601..4f15da2 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -134,6 +134,8 @@ public final class MessagingService implements MessagingServiceMBean
         PAXOS_PROPOSE,
         PAXOS_COMMIT,
         @Deprecated PAGED_RANGE,
+        BATCHLOG_MUTATION,
+        MATERIALIZED_VIEW_MUTATION,
         // remember to add new verbs at the end, since we serialize by ordinal
         UNUSED_1,
         UNUSED_2,
@@ -145,6 +147,8 @@ public final class MessagingService implements MessagingServiceMBean
     {{
         put(Verb.MUTATION, Stage.MUTATION);
         put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION);
+        put(Verb.MATERIALIZED_VIEW_MUTATION, Stage.MATERIALIZED_VIEW_MUTATION);
+        put(Verb.BATCHLOG_MUTATION, Stage.BATCHLOG_MUTATION);
         put(Verb.READ_REPAIR, Stage.MUTATION);
         put(Verb.TRUNCATE, Stage.MUTATION);
         put(Verb.PAXOS_PREPARE, Stage.MUTATION);
@@ -203,6 +207,8 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance);
 
         put(Verb.MUTATION, Mutation.serializer);
+        put(Verb.BATCHLOG_MUTATION, Mutation.serializer);
+        put(Verb.MATERIALIZED_VIEW_MUTATION, Mutation.serializer);
         put(Verb.READ_REPAIR, Mutation.serializer);
         put(Verb.READ, ReadCommand.serializer);
         //put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer);
@@ -229,6 +235,8 @@ public final class MessagingService implements MessagingServiceMBean
     public static final EnumMap<Verb, IVersionedSerializer<?>> callbackDeserializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class)
     {{
         put(Verb.MUTATION, WriteResponse.serializer);
+        put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer);
+        put(Verb.MATERIALIZED_VIEW_MUTATION, WriteResponse.serializer);
         put(Verb.READ_REPAIR, WriteResponse.serializer);
         put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
         put(Verb.RANGE_SLICE, ReadResponse.legacyRangeSliceReplySerializer);
@@ -291,6 +299,8 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb._TRACE,
                                                                    Verb.MUTATION,
+                                                                   Verb.BATCHLOG_MUTATION, //FIXME: should this be droppable??
+                                                                   Verb.MATERIALIZED_VIEW_MUTATION,
                                                                    Verb.COUNTER_MUTATION,
                                                                    Verb.READ_REPAIR,
                                                                    Verb.READ,
@@ -618,7 +628,10 @@ public final class MessagingService implements MessagingServiceMBean
                            ConsistencyLevel consistencyLevel,
                            boolean allowHints)
     {
-        assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
+        assert message.verb == Verb.MUTATION
+               || message.verb == Verb.BATCHLOG_MUTATION
+               || message.verb == Verb.MATERIALIZED_VIEW_MUTATION
+               || message.verb == Verb.COUNTER_MUTATION;
         int messageId = nextId();
 
         CallbackInfo previous = callbacks.put(messageId,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 7326fa9..1c21e41 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -307,7 +307,9 @@ public final class LegacySchemaMigrator
                                     defaultValidator);
         }
 
-        CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, columnDefs);
+        // The legacy schema did not have views, so we know that we are not loading a materialized view
+        boolean isMaterializedView = false;
+        CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, isMaterializedView, columnDefs);
 
         cfm.readRepairChance(tableRow.getDouble("read_repair_chance"));
         cfm.dcLocalReadRepairChance(tableRow.getDouble("local_read_repair_chance"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/schema/MaterializedViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MaterializedViews.java b/src/java/org/apache/cassandra/schema/MaterializedViews.java
new file mode 100644
index 0000000..1c55736
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/MaterializedViews.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+
+import java.util.Iterator;
+import java.util.Optional;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.config.MaterializedViewDefinition;
+
+import static com.google.common.collect.Iterables.filter;
+
+public final class MaterializedViews implements Iterable<MaterializedViewDefinition>
+{
+    private final ImmutableMap<String, MaterializedViewDefinition> materializedViews;
+
+    private MaterializedViews(Builder builder)
+    {
+        materializedViews = builder.materializedViews.build();
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static MaterializedViews none()
+    {
+        return builder().build();
+    }
+
+    public Iterator<MaterializedViewDefinition> iterator()
+    {
+        return materializedViews.values().iterator();
+    }
+
+    public int size()
+    {
+        return materializedViews.size();
+    }
+
+    public boolean isEmpty()
+    {
+        return materializedViews.isEmpty();
+    }
+
+    /**
+     * Get the materialized view with the specified name
+     *
+     * @param name a non-qualified materialized view name
+     * @return an empty {@link Optional} if the materialized view name is not found; a non-empty optional of {@link MaterializedViewDefinition} otherwise
+     */
+    public Optional<MaterializedViewDefinition> get(String name)
+    {
+        return Optional.ofNullable(materializedViews.get(name));
+    }
+
+    /**
+     * Create a MaterializedViews instance with the provided materialized view added
+     */
+    public MaterializedViews with(MaterializedViewDefinition materializedView)
+    {
+        if (get(materializedView.viewName).isPresent())
+            throw new IllegalStateException(String.format("Materialized View %s already exists", materializedView.viewName));
+
+        return builder().add(this).add(materializedView).build();
+    }
+
+    /**
+     * Creates a MaterializedViews instance with the materializedView with the provided name removed
+     */
+    public MaterializedViews without(String name)
+    {
+        MaterializedViewDefinition materializedView =
+        get(name).orElseThrow(() -> new IllegalStateException(String.format("Materialized View %s doesn't exists", name)));
+
+        return builder().add(filter(this, v -> v != materializedView)).build();
+    }
+
+    /**
+     * Creates a MaterializedViews instance which contains an updated materialized view
+     */
+    public MaterializedViews replace(MaterializedViewDefinition materializedView)
+    {
+        return without(materializedView.viewName).with(materializedView);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        return this == o || (o instanceof MaterializedViews && materializedViews.equals(((MaterializedViews) o).materializedViews));
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return materializedViews.hashCode();
+    }
+
+    @Override
+    public String toString()
+    {
+        return materializedViews.values().toString();
+    }
+
+    public static final class Builder
+    {
+        final ImmutableMap.Builder<String, MaterializedViewDefinition> materializedViews = new ImmutableMap.Builder<>();
+
+        private Builder()
+        {
+        }
+
+        public MaterializedViews build()
+        {
+            return new MaterializedViews(this);
+        }
+
+        public Builder add(MaterializedViewDefinition materializedView)
+        {
+            materializedViews.put(materializedView.viewName, materializedView);
+            return this;
+        }
+
+        public Builder add(Iterable<MaterializedViewDefinition> materializedViews)
+        {
+            materializedViews.forEach(this::add);
+            return this;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 2bc7b0c..2150f4a 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -76,12 +76,13 @@ public final class SchemaKeyspace
     public static final String COLUMNS = "columns";
     public static final String DROPPED_COLUMNS = "dropped_columns";
     public static final String TRIGGERS = "triggers";
+    public static final String MATERIALIZED_VIEWS = "materialized_views";
     public static final String TYPES = "types";
     public static final String FUNCTIONS = "functions";
     public static final String AGGREGATES = "aggregates";
 
     public static final List<String> ALL =
-        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, TYPES, FUNCTIONS, AGGREGATES);
+        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, MATERIALIZED_VIEWS, TYPES, FUNCTIONS, AGGREGATES);
 
     private static final CFMetaData Keyspaces =
         compile(KEYSPACES,
@@ -152,6 +153,18 @@ public final class SchemaKeyspace
                 + "trigger_options map<text, text>,"
                 + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))");
 
+    private static final CFMetaData MaterializedViews =
+        compile(MATERIALIZED_VIEWS,
+                "materialized views definitions",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "table_name text,"
+                + "view_name text,"
+                + "target_columns list<text>,"
+                + "clustering_columns list<text>,"
+                + "included_columns list<text>,"
+                + "PRIMARY KEY ((keyspace_name), table_name, view_name))");
+
     private static final CFMetaData Types =
         compile(TYPES,
                 "user defined type definitions",
@@ -193,7 +206,7 @@ public final class SchemaKeyspace
                 + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))");
 
     public static final List<CFMetaData> All =
-        ImmutableList.of(Keyspaces, Tables, Columns, DroppedColumns, Triggers, Types, Functions, Aggregates);
+        ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, MaterializedViews, Types, Functions, Aggregates);
 
     private static CFMetaData compile(String name, String description, String schema)
     {
@@ -688,6 +701,8 @@ public final class SchemaKeyspace
         Mutation mutation = new Mutation(NAME, getSchemaKSDecoratedKey(keyspace.name));
         for (CFMetaData schemaTable : All)
             mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, mutation.key(), timestamp, nowInSec));
+        mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.BuiltMaterializedViews, mutation.key(), timestamp, nowInSec));
+        mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.MaterializedViewsBuildsInProgress, mutation.key(), timestamp, nowInSec));
         return mutation;
     }
 
@@ -830,6 +845,9 @@ public final class SchemaKeyspace
 
             for (TriggerMetadata trigger : table.getTriggers())
                 addTriggerToSchemaMutation(table, trigger, timestamp, mutation);
+
+            for (MaterializedViewDefinition materializedView: table.getMaterializedViews())
+                addMaterializedViewToSchemaMutation(table, materializedView, timestamp, mutation);
         }
     }
 
@@ -923,6 +941,22 @@ public final class SchemaKeyspace
         for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnRight().values())
             addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation);
 
+        MapDifference<String, MaterializedViewDefinition> materializedViewDiff = materializedViewsDiff(oldTable.getMaterializedViews(), newTable.getMaterializedViews());
+
+        // dropped materialized views
+        for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnLeft().values())
+            dropMaterializedViewFromSchemaMutation(oldTable, materializedView, timestamp, mutation);
+
+        // newly created materialized views
+        for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnRight().values())
+            addMaterializedViewToSchemaMutation(oldTable, materializedView, timestamp, mutation);
+
+        // updated materialized views need to be updated
+        for (MapDifference.ValueDifference<MaterializedViewDefinition> diff : materializedViewDiff.entriesDiffering().values())
+        {
+            addUpdatedMaterializedViewDefinitionToSchemaMutation(oldTable, diff.rightValue(), timestamp, mutation);
+        }
+
         return mutation;
     }
 
@@ -937,6 +971,17 @@ public final class SchemaKeyspace
         return Maps.difference(beforeMap, afterMap);
     }
 
+    private static MapDifference<String, MaterializedViewDefinition> materializedViewsDiff(MaterializedViews before, MaterializedViews after)
+    {
+        Map<String, MaterializedViewDefinition> beforeMap = new HashMap<>();
+        before.forEach(v -> beforeMap.put(v.viewName, v));
+
+        Map<String, MaterializedViewDefinition> afterMap = new HashMap<>();
+        after.forEach(v -> afterMap.put(v.viewName, v));
+
+        return Maps.difference(beforeMap, afterMap);
+    }
+
     public static Mutation makeDropTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
@@ -950,6 +995,9 @@ public final class SchemaKeyspace
         for (TriggerMetadata trigger : table.getTriggers())
             dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation);
 
+        for (MaterializedViewDefinition materializedView : table.getMaterializedViews())
+            dropMaterializedViewFromSchemaMutation(table, materializedView, timestamp, mutation);
+
         return mutation;
     }
 
@@ -1014,8 +1062,12 @@ public final class SchemaKeyspace
         Triggers triggers =
             readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, SchemaKeyspace::createTriggersFromTriggersPartition);
 
+        MaterializedViews views =
+            readSchemaPartitionForTableAndApply(MATERIALIZED_VIEWS, keyspace, table, SchemaKeyspace::createMaterializedViewsFromMaterializedViewsPartition);
+
         return createTableFromTableRowAndColumns(row, columns).droppedColumns(droppedColumns)
-                                                              .triggers(triggers);
+                                                              .triggers(triggers)
+                                                              .materializedViews(views);
     }
 
     public static CFMetaData createTableFromTableRowAndColumns(UntypedResultSet.Row row, List<ColumnDefinition> columns)
@@ -1032,8 +1084,9 @@ public final class SchemaKeyspace
         boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
         boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
         boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
+        boolean isMaterializedView = flags.contains(CFMetaData.Flag.MATERIALIZEDVIEW);
 
-        CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, isCompound, isSuper, isCounter, columns);
+        CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, isCompound, isSuper, isCounter, isMaterializedView, columns);
 
         Map<String, String> compaction = new HashMap<>(row.getTextMap("compaction"));
         Class<? extends AbstractCompactionStrategy> compactionStrategyClass =
@@ -1221,6 +1274,103 @@ public final class SchemaKeyspace
     }
 
     /*
+     * Global Index metadata serialization/deserialization.
+     */
+
+    private static void addMaterializedViewToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
+    {
+        RowUpdateBuilder builder = new RowUpdateBuilder(MaterializedViews, timestamp, mutation)
+            .clustering(table.cfName, materializedView.viewName);
+
+        for (ColumnIdentifier partitionColumn : materializedView.partitionColumns)
+            builder.addListEntry("target_columns", partitionColumn.toString());
+        for (ColumnIdentifier clusteringColumn : materializedView.clusteringColumns)
+            builder = builder.addListEntry("clustering_columns", clusteringColumn.toString());
+        for (ColumnIdentifier includedColumn : materializedView.included)
+            builder = builder.addListEntry("included_columns", includedColumn.toString());
+
+        builder.build();
+    }
+
+    private static void dropMaterializedViewFromSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
+    {
+        RowUpdateBuilder.deleteRow(MaterializedViews, timestamp, mutation, table.cfName, materializedView.viewName);
+    }
+
+    private static void addUpdatedMaterializedViewDefinitionToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
+    {
+        RowUpdateBuilder builder = new RowUpdateBuilder(MaterializedViews, timestamp, mutation)
+                                   .clustering(table.cfName, materializedView.viewName);
+
+        builder.resetCollection("target_columns");
+        for (ColumnIdentifier partitionColumn : materializedView.partitionColumns)
+            builder.addListEntry("target_columns", partitionColumn.toString());
+
+        builder.resetCollection("clustering_columns");
+        for (ColumnIdentifier clusteringColumn : materializedView.clusteringColumns)
+            builder = builder.addListEntry("clustering_columns", clusteringColumn.toString());
+
+        builder.resetCollection("included_columns");
+        for (ColumnIdentifier includedColumn : materializedView.included)
+            builder = builder.addListEntry("included_columns", includedColumn.toString());
+
+        builder.build();
+    }
+
+    /**
+     * Deserialize materialized views from storage-level representation.
+     *
+     * @param partition storage-level partition containing the materialized view definitions
+     * @return the list of processed MaterializedViewDefinitions
+     */
+    private static MaterializedViews createMaterializedViewsFromMaterializedViewsPartition(RowIterator partition)
+    {
+        MaterializedViews.Builder views = org.apache.cassandra.schema.MaterializedViews.builder();
+        String query = String.format("SELECT * FROM %s.%s", NAME, MATERIALIZED_VIEWS);
+        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
+        {
+            MaterializedViewDefinition mv = createMaterializedViewFromMaterializedViewRow(row);
+            views.add(mv);
+        }
+        return views.build();
+    }
+
+    private static MaterializedViewDefinition createMaterializedViewFromMaterializedViewRow(UntypedResultSet.Row row)
+    {
+        String name = row.getString("view_name");
+        List<String> partitionColumnNames = row.getList("target_columns", UTF8Type.instance);
+
+        String cfName = row.getString("columnfamily_name");
+        List<String> clusteringColumnNames = row.getList("clustering_columns", UTF8Type.instance);
+
+        List<ColumnIdentifier> partitionColumns = new ArrayList<>();
+        for (String columnName : partitionColumnNames)
+        {
+            partitionColumns.add(ColumnIdentifier.getInterned(columnName, true));
+        }
+
+        List<ColumnIdentifier> clusteringColumns = new ArrayList<>();
+        for (String columnName : clusteringColumnNames)
+        {
+            clusteringColumns.add(ColumnIdentifier.getInterned(columnName, true));
+        }
+
+        List<String> includedColumnNames = row.getList("included_columns", UTF8Type.instance);
+        Set<ColumnIdentifier> includedColumns = new HashSet<>();
+        if (includedColumnNames != null)
+        {
+            for (String columnName : includedColumnNames)
+                includedColumns.add(ColumnIdentifier.getInterned(columnName, true));
+        }
+
+        return new MaterializedViewDefinition(cfName,
+                                              name,
+                                              partitionColumns,
+                                              clusteringColumns,
+                                              includedColumns);
+    }
+
+    /*
      * UDF metadata serialization/deserialization.
      */
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 6696e10..9a57f45 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -196,7 +196,7 @@ public abstract class AbstractReadExecutor
             return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas);
     }
 
-    private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
+    public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
     {
         public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 8978034..e3ba66e 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -47,7 +47,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
     public final ConsistencyLevel consistencyLevel;
     protected final Runnable callback;
     protected final Collection<InetAddress> pendingEndpoints;
-    private final WriteType writeType;
+    protected final WriteType writeType;
     private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
         = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures");
     private volatile int failures = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
new file mode 100644
index 0000000..ac44923
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.net.MessageIn;
+
+public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
+{
+    AbstractWriteResponseHandler<T> wrapped;
+    BatchlogCleanup cleanup;
+    protected volatile int requiredBeforeFinish;
+    private static final AtomicIntegerFieldUpdater<BatchlogResponseHandler> requiredBeforeFinishUpdater
+            = AtomicIntegerFieldUpdater.newUpdater(BatchlogResponseHandler.class, "requiredBeforeFinish");
+
+    public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup)
+    {
+        super(wrapped.keyspace, wrapped.naturalEndpoints, wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, wrapped.writeType);
+        this.wrapped = wrapped;
+        this.requiredBeforeFinish = requiredBeforeFinish;
+        this.cleanup = cleanup;
+    }
+
+    protected int ackCount()
+    {
+        return wrapped.ackCount();
+    }
+
+    public void response(MessageIn<T> msg)
+    {
+        wrapped.response(msg);
+        if (requiredBeforeFinishUpdater.decrementAndGet(this) == 0)
+            cleanup.run();
+    }
+
+    public boolean isLatencyForSnitch()
+    {
+        return wrapped.isLatencyForSnitch();
+    }
+
+    public void onFailure(InetAddress from)
+    {
+        wrapped.onFailure(from);
+    }
+
+    public void assureSufficientLiveNodes()
+    {
+        wrapped.assureSufficientLiveNodes();
+    }
+
+    public void get() throws WriteTimeoutException, WriteFailureException
+    {
+        wrapped.get();
+    }
+
+    protected int totalBlockFor()
+    {
+        return wrapped.totalBlockFor();
+    }
+
+    protected int totalEndpoints()
+    {
+        return wrapped.totalEndpoints();
+    }
+
+    protected boolean waitingFor(InetAddress from)
+    {
+        return wrapped.waitingFor(from);
+    }
+
+    protected void signal()
+    {
+        wrapped.signal();
+    }
+
+    public static class BatchlogCleanup
+    {
+        private final BatchlogCleanupCallback callback;
+
+        protected volatile int mutationsWaitingFor;
+        private static final AtomicIntegerFieldUpdater<BatchlogCleanup> mutationsWaitingForUpdater
+            = AtomicIntegerFieldUpdater.newUpdater(BatchlogCleanup.class, "mutationsWaitingFor");
+
+        public BatchlogCleanup(int mutationsWaitingFor, BatchlogCleanupCallback callback)
+        {
+            this.mutationsWaitingFor = mutationsWaitingFor;
+            this.callback = callback;
+        }
+
+        public void run()
+        {
+            if (mutationsWaitingForUpdater.decrementAndGet(this) == 0)
+                callback.invoke();
+        }
+    }
+
+    public interface BatchlogCleanupCallback
+    {
+        void invoke();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 548cbc7..fddf593 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -287,6 +287,24 @@ public class CassandraDaemon
             }
         }
 
+        Runnable indexRebuild = new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                for (Keyspace keyspace : Keyspace.all())
+                {
+                    for (ColumnFamilyStore cf: keyspace.getColumnFamilyStores())
+                    {
+                        cf.materializedViewManager.buildAllViews();
+                    }
+                }
+            }
+        };
+
+        ScheduledExecutors.optionalTasks.schedule(indexRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
+
+
         SystemKeyspace.finishStartup();
 
         // start server internals

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 2c5a2ab..be11c77 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -31,6 +31,10 @@ import com.google.common.base.Predicate;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.apache.cassandra.db.view.MaterializedViewManager;
+import org.apache.cassandra.db.view.MaterializedViewUtils;
+import org.apache.cassandra.metrics.*;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +62,6 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.metrics.*;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.paxos.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -91,6 +94,7 @@ public class StorageProxy implements StorageProxyMBean
     private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write");
     private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite");
     private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead");
+    private static final MVWriteMetrics mvWriteMetrics = new MVWriteMetrics("MVWrite");
 
     private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
 
@@ -118,7 +122,7 @@ public class StorageProxy implements StorageProxyMBean
             throws OverloadedException
             {
                 assert mutation instanceof Mutation;
-                sendToHintedEndpoints((Mutation) mutation, targets, responseHandler, localDataCenter);
+                sendToHintedEndpoints((Mutation) mutation, targets, responseHandler, localDataCenter, Stage.MUTATION);
             }
         };
 
@@ -622,6 +626,80 @@ public class StorageProxy implements StorageProxyMBean
         Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
     }
 
+    /**
+     * Use this method to have these Mutations applied
+     * across all replicas.
+     *
+     * @param mutations the mutations to be applied across the replicas
+     */
+    public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations)
+    throws UnavailableException, OverloadedException, WriteTimeoutException
+    {
+        Tracing.trace("Determining replicas for mutation");
+        final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+
+        long startTime = System.nanoTime();
+        List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size());
+
+        try
+        {
+            Token baseToken = StorageService.getPartitioner().getToken(dataKey);
+
+            ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
+
+            //Since the base -> view replication is 1:1 we only need to store the BL locally
+            final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress());
+            final UUID batchUUID = UUIDGen.getTimeUUID();
+            BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
+                                                                                                          () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+
+            // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
+            for (Mutation mutation : mutations)
+            {
+                String keyspaceName = mutation.getKeyspaceName();
+                Token tk = mutation.key().getToken();
+                List<InetAddress> naturalEndpoints = Lists.newArrayList(MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk));
+
+                WriteResponseHandlerWrapper wrapper = wrapMVBatchResponseHandler(mutation,
+                                                                                 consistencyLevel,
+                                                                                 consistencyLevel,
+                                                                                 naturalEndpoints,
+                                                                                 WriteType.BATCH,
+                                                                                 cleanup);
+
+                wrappers.add(wrapper);
+
+                //Apply to local batchlog memtable in this thread
+                BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply();
+            }
+
+            // now actually perform the writes and wait for them to complete
+            asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
+        }
+        catch (WriteTimeoutException ex)
+        {
+            mvWriteMetrics.timeouts.mark();
+            Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
+            throw ex;
+        }
+        catch (UnavailableException e)
+        {
+            mvWriteMetrics.unavailables.mark();
+            Tracing.trace("Unavailable");
+            throw e;
+        }
+        catch (OverloadedException e)
+        {
+            mvWriteMetrics.unavailables.mark();
+            Tracing.trace("Overloaded");
+            throw e;
+        }
+        finally
+        {
+            mvWriteMetrics.addNano(System.nanoTime() - startTime);
+        }
+    }
+
     @SuppressWarnings("unchecked")
     public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
                                           ConsistencyLevel consistencyLevel,
@@ -630,12 +708,17 @@ public class StorageProxy implements StorageProxyMBean
     {
         Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations);
 
+        boolean updatesView = MaterializedViewManager.updatesAffectView(mutations, true);
+
         if (augmented != null)
-            mutateAtomically(augmented, consistencyLevel);
-        else if (mutateAtomically)
-            mutateAtomically((Collection<Mutation>) mutations, consistencyLevel);
+            mutateAtomically(augmented, consistencyLevel, updatesView);
         else
-            mutate(mutations, consistencyLevel);
+        {
+            if (mutateAtomically || updatesView)
+                mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView);
+            else
+                mutate(mutations, consistencyLevel);
+        }
     }
 
     /**
@@ -646,8 +729,11 @@ public class StorageProxy implements StorageProxyMBean
      *
      * @param mutations the Mutations to be applied across the replicas
      * @param consistency_level the consistency level for the operation
+     * @param requireQuorumForRemove at least a quorum of nodes will see update before deleting batchlog
      */
-    public static void mutateAtomically(Collection<Mutation> mutations, ConsistencyLevel consistency_level)
+    public static void mutateAtomically(Collection<Mutation> mutations,
+                                        ConsistencyLevel consistency_level,
+                                        boolean requireQuorumForRemove)
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         Tracing.trace("Determining replicas for atomic batch");
@@ -658,25 +744,49 @@ public class StorageProxy implements StorageProxyMBean
 
         try
         {
+
+            // If we are requiring quorum nodes for removal, we upgrade consistency level to QUORUM unless we already
+            // require ALL, or EACH_QUORUM. This is so that *at least* QUORUM nodes see the update.
+            ConsistencyLevel batchConsistencyLevel = requireQuorumForRemove
+                                                     ? ConsistencyLevel.QUORUM
+                                                     : consistency_level;
+
+            switch (consistency_level)
+            {
+                case ALL:
+                case EACH_QUORUM:
+                    batchConsistencyLevel = consistency_level;
+            }
+
+            final Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
+            final UUID batchUUID = UUIDGen.getTimeUUID();
+            BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
+                                                                                                          new BatchlogResponseHandler.BatchlogCleanupCallback()
+                                                                                                          {
+                                                                                                              public void invoke()
+                                                                                                              {
+                                                                                                                  asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID);
+                                                                                                              }
+                                                                                                          });
+
             // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
             for (Mutation mutation : mutations)
             {
-                WriteResponseHandlerWrapper wrapper = wrapResponseHandler(mutation, consistency_level, WriteType.BATCH);
+                WriteResponseHandlerWrapper wrapper = wrapBatchResponseHandler(mutation,
+                                                                               consistency_level,
+                                                                               batchConsistencyLevel,
+                                                                               WriteType.BATCH,
+                                                                               cleanup);
                 // exit early if we can't fulfill the CL at this time.
                 wrapper.handler.assureSufficientLiveNodes();
                 wrappers.add(wrapper);
             }
 
             // write to the batchlog
-            Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, consistency_level);
-            UUID batchUUID = UUIDGen.getTimeUUID();
             syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID);
 
             // now actually perform the writes and wait for them to complete
-            syncWriteBatchedMutations(wrappers, localDataCenter);
-
-            // remove the batchlog entries asynchronously
-            asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID);
+            syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION);
         }
         catch (UnavailableException e)
         {
@@ -719,13 +829,13 @@ public class StorageProxy implements StorageProxyMBean
                                                                         WriteType.BATCH_LOG);
 
         MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version)
-                                                      .createMessage();
+                                                      .createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
         for (InetAddress target : endpoints)
         {
             int targetVersion = MessagingService.instance().getVersion(target);
             if (canDoLocalRequest(target))
             {
-                insertLocal(message.payload, handler);
+                insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler);
             }
             else if (targetVersion == MessagingService.current_version)
             {
@@ -734,7 +844,7 @@ public class StorageProxy implements StorageProxyMBean
             else
             {
                 MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion)
-                                                                  .createMessage(),
+                                                                  .createMessage(MessagingService.Verb.BATCHLOG_MUTATION),
                                                    target,
                                                    handler,
                                                    false);
@@ -754,25 +864,43 @@ public class StorageProxy implements StorageProxyMBean
                                                                         WriteType.SIMPLE);
         Mutation mutation = new Mutation(SystemKeyspace.NAME, StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(uuid)));
         mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
-        MessageOut<Mutation> message = mutation.createMessage();
+        MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
         for (InetAddress target : endpoints)
         {
             if (canDoLocalRequest(target))
-                insertLocal(message.payload, handler);
+                insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler);
             else
                 MessagingService.instance().sendRR(message, target, handler, false);
         }
     }
 
-    private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter)
+    private static void asyncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage)
+    {
+        for (WriteResponseHandlerWrapper wrapper : wrappers)
+        {
+            Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints);
+
+            try
+            {
+                sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage);
+            }
+            catch (OverloadedException | WriteTimeoutException e)
+            {
+                wrapper.handler.onFailure(FBUtilities.getBroadcastAddress());
+            }
+        }
+    }
+
+    private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage)
     throws WriteTimeoutException, OverloadedException
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
             Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints);
-            sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter);
+            sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage);
         }
 
+
         for (WriteResponseHandlerWrapper wrapper : wrappers)
             wrapper.handler.get();
     }
@@ -815,25 +943,52 @@ public class StorageProxy implements StorageProxyMBean
         return responseHandler;
     }
 
-    // same as above except does not initiate writes (but does perform availability checks).
-    private static WriteResponseHandlerWrapper wrapResponseHandler(Mutation mutation, ConsistencyLevel consistency_level, WriteType writeType)
+    // same as performWrites except does not initiate writes (but does perform availability checks).
+    private static WriteResponseHandlerWrapper wrapBatchResponseHandler(Mutation mutation,
+                                                                        ConsistencyLevel consistency_level,
+                                                                        ConsistencyLevel batchConsistencyLevel,
+                                                                        WriteType writeType,
+                                                                        BatchlogResponseHandler.BatchlogCleanup cleanup)
     {
-        AbstractReplicationStrategy rs = Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy();
+        Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+        AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
         String keyspaceName = mutation.getKeyspaceName();
         Token tk = mutation.key().getToken();
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
-        AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
-        return new WriteResponseHandlerWrapper(responseHandler, mutation);
+        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
+        BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
+        return new WriteResponseHandlerWrapper(batchHandler, mutation);
+    }
+
+    /**
+     * Same as performWrites except does not initiate writes (but does perform availability checks).
+     * Keeps track of MVWriteMetrics
+     */
+    private static WriteResponseHandlerWrapper wrapMVBatchResponseHandler(Mutation mutation,
+                                                                          ConsistencyLevel consistency_level,
+                                                                          ConsistencyLevel batchConsistencyLevel,
+                                                                          List<InetAddress> naturalEndpoints,
+                                                                          WriteType writeType,
+                                                                          BatchlogResponseHandler.BatchlogCleanup cleanup)
+    {
+        Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+        AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
+        String keyspaceName = mutation.getKeyspaceName();
+        Token tk = mutation.key().getToken();
+        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
+        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
+        BatchlogResponseHandler<IMutation> batchHandler = new MVWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
+        return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }
 
     // used by atomic_batch_mutate to decouple availability check from the write itself, caches consistency level and endpoints.
     private static class WriteResponseHandlerWrapper
     {
-        final AbstractWriteResponseHandler<IMutation> handler;
+        final BatchlogResponseHandler<IMutation> handler;
         final Mutation mutation;
 
-        WriteResponseHandlerWrapper(AbstractWriteResponseHandler<IMutation> handler, Mutation mutation)
+        WriteResponseHandlerWrapper(BatchlogResponseHandler<IMutation> handler, Mutation mutation)
         {
             this.handler = handler;
             this.mutation = mutation;
@@ -886,7 +1041,8 @@ public class StorageProxy implements StorageProxyMBean
     public static void sendToHintedEndpoints(final Mutation mutation,
                                              Iterable<InetAddress> targets,
                                              AbstractWriteResponseHandler<IMutation> responseHandler,
-                                             String localDataCenter)
+                                             String localDataCenter,
+                                             Stage stage)
     throws OverloadedException
     {
         // extra-datacenter replicas, grouped by dc
@@ -950,7 +1106,7 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         if (insertLocal)
-            insertLocal(mutation, responseHandler);
+            insertLocal(stage, mutation, responseHandler);
 
         if (dcGroups != null)
         {
@@ -1059,10 +1215,9 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler)
+    private static void insertLocal(Stage stage, final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler)
     {
-
-        StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable()
+        StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable()
         {
             public void runMayThrow()
             {
@@ -1073,7 +1228,8 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 catch (Exception ex)
                 {
-                    logger.error("Failed to apply mutation locally : {}", ex);
+                    if (!(ex instanceof WriteTimeoutException))
+                        logger.error("Failed to apply mutation locally : {}", ex);
                     responseHandler.onFailure(FBUtilities.getBroadcastAddress());
                 }
             }
@@ -1193,7 +1349,7 @@ public class StorageProxy implements StorageProxyMBean
                 Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets),
                                                            ImmutableSet.of(FBUtilities.getBroadcastAddress()));
                 if (!remotes.isEmpty())
-                    sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter);
+                    sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter, Stage.COUNTER_MUTATION);
             }
         };
     }
@@ -2123,6 +2279,24 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /**
+     * This class captures metrics for materialized views writes.
+     */
+    private static class MVWriteMetricsWrapped extends BatchlogResponseHandler<IMutation>
+    {
+        public MVWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup)
+        {
+            super(writeHandler, i, cleanup);
+            mvWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
+        }
+
+        public void response(MessageIn<IMutation> msg)
+        {
+            super.response(msg);
+            mvWriteMetrics.viewReplicasSuccess.inc();
+        }
+    }
+
+    /**
      * A Runnable that aborts if it doesn't start running before it times out
      */
     private static abstract class DroppableRunnable implements Runnable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index da53bf7..5049337 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -297,6 +297,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         /* register the verb handlers */
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCHLOG_MUTATION, new MutationVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MATERIALIZED_VIEW_MUTATION, new MutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new ReadCommandVerbHandler());
@@ -629,9 +631,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             public void runMayThrow() throws InterruptedException
             {
                 inShutdownHook = true;
+                ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
+                ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION);
                 ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
                 ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
-                if (mutationStage.isShutdown() && counterMutationStage.isShutdown())
+                if (mutationStage.isShutdown()
+                    && counterMutationStage.isShutdown()
+                    && batchlogMutationStage.isShutdown()
+                    && materializedViewMutationStage.isShutdown())
                     return; // drained already
 
                 if (daemon != null)
@@ -642,8 +649,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 // In-progress writes originating here could generate hints to be written, so shut down MessagingService
                 // before mutation stage, so we can get all the hints saved before shutting down
                 MessagingService.instance().shutdown();
+                materializedViewMutationStage.shutdown();
+                batchlogMutationStage.shutdown();
                 counterMutationStage.shutdown();
                 mutationStage.shutdown();
+                materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
+                batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 StorageProxy.instance.verifyNoHintsInProgress();
@@ -3820,8 +3831,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         inShutdownHook = true;
         
         ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
+        ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION);
+        ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
         ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
-        if (mutationStage.isTerminated() && counterMutationStage.isTerminated())
+        if (mutationStage.isTerminated()
+            && counterMutationStage.isTerminated()
+            && batchlogMutationStage.isTerminated()
+            && materializedViewMutationStage.isTerminated())
         {
             logger.warn("Cannot drain node (did it already happen?)");
             return;
@@ -3835,8 +3851,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         MessagingService.instance().shutdown();
 
         setMode(Mode.DRAINING, "clearing mutation stage", false);
+        materializedViewMutationStage.shutdown();
+        batchlogMutationStage.shutdown();
         counterMutationStage.shutdown();
         mutationStage.shutdown();
+        materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
+        batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
         counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
         mutationStage.awaitTermination(3600, TimeUnit.SECONDS);