You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/16 16:35:15 UTC

[4/6] cassandra git commit: Improve MV schema representation

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
deleted file mode 100644
index e23fd84..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.ReadOrderGroup;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.compaction.CompactionInfo;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.partitions.FilteredPartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.Refs;
-
-public class MaterializedViewBuilder extends CompactionInfo.Holder
-{
-    private final ColumnFamilyStore baseCfs;
-    private final MaterializedView view;
-    private final UUID compactionId;
-    private volatile Token prevToken = null;
-
-    private static final Logger logger = LoggerFactory.getLogger(MaterializedViewBuilder.class);
-
-    private volatile boolean isStopped = false;
-
-    public MaterializedViewBuilder(ColumnFamilyStore baseCfs, MaterializedView view)
-    {
-        this.baseCfs = baseCfs;
-        this.view = view;
-        compactionId = UUIDGen.getTimeUUID();
-    }
-
-    private void buildKey(DecoratedKey key)
-    {
-        QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null);
-
-        while (!pager.isExhausted())
-        {
-           try (ReadOrderGroup orderGroup = pager.startOrderGroup();
-                PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup))
-           {
-               if (!partitionIterator.hasNext())
-                   return;
-
-               try (RowIterator rowIterator = partitionIterator.next())
-               {
-                   FilteredPartition partition = FilteredPartition.create(rowIterator);
-                   TemporalRow.Set temporalRows = view.getTemporalRowSet(partition, null, true);
-
-                   Collection<Mutation> mutations = view.createMutations(partition, temporalRows, true);
-
-                   if (mutations != null)
-                       StorageProxy.mutateMV(key.getKey(), mutations, true);
-               }
-           }
-        }
-    }
-
-    public void run()
-    {
-        String ksname = baseCfs.metadata.ksName, viewName = view.name;
-
-        if (SystemKeyspace.isViewBuilt(ksname, viewName))
-            return;
-
-        Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
-        final Pair<Integer, Token> buildStatus = SystemKeyspace.getMaterializedViewBuildStatus(ksname, viewName);
-        Token lastToken;
-        Function<View, Iterable<SSTableReader>> function;
-        if (buildStatus == null)
-        {
-            baseCfs.forceBlockingFlush();
-            function = View.select(SSTableSet.CANONICAL);
-            int generation = Integer.MIN_VALUE;
-
-            try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
-            {
-                for (SSTableReader reader : temp)
-                {
-                    generation = Math.max(reader.descriptor.generation, generation);
-                }
-            }
-
-            SystemKeyspace.beginMaterializedViewBuild(ksname, viewName, generation);
-            lastToken = null;
-        }
-        else
-        {
-            function = new Function<View, Iterable<SSTableReader>>()
-            {
-                @Nullable
-                public Iterable<SSTableReader> apply(View view)
-                {
-                    Iterable<SSTableReader> readers = View.select(SSTableSet.CANONICAL).apply(view);
-                    if (readers != null)
-                        return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
-                    return null;
-                }
-            };
-            lastToken = buildStatus.right;
-        }
-
-        prevToken = lastToken;
-        try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
-             ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
-        {
-            while (!isStopped && iter.hasNext())
-            {
-                DecoratedKey key = iter.next();
-                Token token = key.getToken();
-                if (lastToken == null || lastToken.compareTo(token) < 0)
-                {
-                    for (Range<Token> range : ranges)
-                    {
-                        if (range.contains(token))
-                        {
-                            buildKey(key);
-
-                            if (prevToken == null || prevToken.compareTo(token) != 0)
-                            {
-                                SystemKeyspace.updateMaterializedViewBuildStatus(ksname, viewName, key.getToken());
-                                prevToken = token;
-                            }
-                        }
-                    }
-                    lastToken = null;
-                }
-            }
-
-            if (!isStopped)
-                SystemKeyspace.finishMaterializedViewBuildStatus(ksname, viewName);
-
-        }
-        catch (Exception e)
-        {
-            final MaterializedViewBuilder builder = new MaterializedViewBuilder(baseCfs, view);
-            ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitMaterializedViewBuilder(builder),
-                                                         5,
-                                                         TimeUnit.MINUTES);
-            logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
-        }
-    }
-
-    public CompactionInfo getCompactionInfo()
-    {
-        long rangesLeft = 0, rangesTotal = 0;
-        Token lastToken = prevToken;
-
-        // This approximation is not very accurate, but since we do not have a method which allows us to calculate the
-        // percentage of a range covered by a second range, this is the best approximation that we can calculate.
-        // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of
-        // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node
-        // has.
-        for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName()))
-        {
-            rangesLeft++;
-            rangesTotal++;
-            // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the
-            // end of the method.
-            if (lastToken == null || range.contains(lastToken))
-                rangesLeft = 0;
-        }
-        return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
-    }
-
-    public void stop()
-    {
-        isStopped = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
deleted file mode 100644
index 41f4ed0..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.locks.Lock;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Striped;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.MaterializedViewDefinition;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-
-/**
- * Manages {@link MaterializedView}'s for a single {@link ColumnFamilyStore}. All of the materialized views for that
- * table are created when this manager is initialized.
- *
- * The main purposes of the manager are to provide a single location for updates to be vetted to see whether they update
- * any views {@link MaterializedViewManager#updateAffectsView(PartitionUpdate)}, provide locks to prevent multiple
- * updates from creating incoherent updates in the view {@link MaterializedViewManager#acquireLockFor(ByteBuffer)}, and
- * to affect change on the view.
- */
-public class MaterializedViewManager
-{
-    private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentWriters() * 1024);
-    private static final boolean enableCoordinatorBatchlog = Boolean.getBoolean("cassandra.mv_enable_coordinator_batchlog");
-
-    private final ConcurrentNavigableMap<String, MaterializedView> viewsByName;
-
-    private final ColumnFamilyStore baseCfs;
-
-    public MaterializedViewManager(ColumnFamilyStore baseCfs)
-    {
-        this.viewsByName = new ConcurrentSkipListMap<>();
-
-        this.baseCfs = baseCfs;
-    }
-
-    public Iterable<MaterializedView> allViews()
-    {
-        return viewsByName.values();
-    }
-
-    public Iterable<ColumnFamilyStore> allViewsCfs()
-    {
-        List<ColumnFamilyStore> viewColumnFamilies = new ArrayList<>();
-        for (MaterializedView view : allViews())
-            viewColumnFamilies.add(view.getViewCfs());
-        return viewColumnFamilies;
-    }
-
-    public void init()
-    {
-        reload();
-    }
-
-    public void invalidate()
-    {
-        for (MaterializedView view : allViews())
-            removeMaterializedView(view.name);
-    }
-
-    public void reload()
-    {
-        Map<String, MaterializedViewDefinition> newViewsByName = new HashMap<>();
-        for (MaterializedViewDefinition definition : baseCfs.metadata.getMaterializedViews())
-        {
-            newViewsByName.put(definition.viewName, definition);
-        }
-
-        for (String viewName : viewsByName.keySet())
-        {
-            if (!newViewsByName.containsKey(viewName))
-                removeMaterializedView(viewName);
-        }
-
-        for (Map.Entry<String, MaterializedViewDefinition> entry : newViewsByName.entrySet())
-        {
-            if (!viewsByName.containsKey(entry.getKey()))
-                addMaterializedView(entry.getValue());
-        }
-
-        for (MaterializedView view : allViews())
-        {
-            view.build();
-            // We provide the new definition from the base metadata
-            view.updateDefinition(newViewsByName.get(view.name));
-        }
-    }
-
-    public void buildAllViews()
-    {
-        for (MaterializedView view : allViews())
-            view.build();
-    }
-
-    public void removeMaterializedView(String name)
-    {
-        MaterializedView view = viewsByName.remove(name);
-
-        if (view == null)
-            return;
-
-        SystemKeyspace.setMaterializedViewRemoved(baseCfs.metadata.ksName, view.name);
-    }
-
-    public void addMaterializedView(MaterializedViewDefinition definition)
-    {
-        MaterializedView view = new MaterializedView(definition, baseCfs);
-
-        viewsByName.put(definition.viewName, view);
-    }
-
-    /**
-     * Calculates and pushes updates to the views replicas. The replicas are determined by
-     * {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
-     */
-    public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog)
-    {
-        List<Mutation> mutations = null;
-        TemporalRow.Set temporalRows = null;
-        for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet())
-        {
-
-            temporalRows = view.getValue().getTemporalRowSet(update, temporalRows, false);
-
-            Collection<Mutation> viewMutations = view.getValue().createMutations(update, temporalRows, false);
-            if (viewMutations != null && !viewMutations.isEmpty())
-            {
-                if (mutations == null)
-                    mutations = Lists.newLinkedList();
-                mutations.addAll(viewMutations);
-            }
-        }
-        if (mutations != null)
-        {
-            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog);
-        }
-    }
-
-    public boolean updateAffectsView(PartitionUpdate upd)
-    {
-        for (MaterializedView view : allViews())
-        {
-            if (view.updateAffectsView(upd))
-                return true;
-        }
-        return false;
-    }
-
-    public static Lock acquireLockFor(ByteBuffer key)
-    {
-        Lock lock = LOCKS.get(key);
-
-        if (lock.tryLock())
-            return lock;
-
-        return null;
-    }
-
-    public static boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean coordinatorBatchlog)
-    {
-        if (coordinatorBatchlog && !enableCoordinatorBatchlog)
-            return false;
-
-        for (IMutation mutation : mutations)
-        {
-            for (PartitionUpdate cf : mutation.getPartitionUpdates())
-            {
-                Keyspace keyspace = Keyspace.open(cf.metadata().ksName);
-
-                if (coordinatorBatchlog && keyspace.getReplicationStrategy().getReplicationFactor() == 1)
-                    continue;
-
-                MaterializedViewManager viewManager = keyspace.getColumnFamilyStore(cf.metadata().cfId).materializedViewManager;
-                if (viewManager.updateAffectsView(cf))
-                    return true;
-            }
-        }
-
-        return false;
-    }
-
-
-    public void forceBlockingFlush()
-    {
-        for (ColumnFamilyStore viewCfs : allViewsCfs())
-            viewCfs.forceBlockingFlush();
-    }
-
-    public void dumpMemtables()
-    {
-        for (ColumnFamilyStore viewCfs : allViewsCfs())
-            viewCfs.dumpMemtable();
-    }
-
-    public void truncateBlocking(long truncatedAt)
-    {
-        for (ColumnFamilyStore viewCfs : allViewsCfs())
-        {
-            ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
-            SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java b/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
deleted file mode 100644
index ea81750..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-public final class MaterializedViewUtils
-{
-    private MaterializedViewUtils()
-    {
-    }
-
-    /**
-     * Calculate the natural endpoint for the view.
-     *
-     * The view natural endpoint is the endpint which has the same cardinality as this node in the replication factor.
-     * The cardinality is the number at which this node would store a piece of data, given the change in replication
-     * factor.
-     *
-     * For example, if we have the following ring:
-     *   A, T1 -> B, T2 -> C, T3 -> A
-     *
-     * For the token T1, at RF=1, A would be included, so A's cardinality for T1 is 1. For the token T1, at RF=2, B would
-     * be included, so B's cardinality for token T1 is 2. For token T3, at RF = 2, A would be included, so A's cardinality
-     * for T3 is 2.
-     *
-     * For a view whose base token is T1 and whose view token is T3, the pairings between the nodes would be:
-     *  A writes to C (A's cardinality is 1 for T1, and C's cardinality is 1 for T3)
-     *  B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 for T3)
-     *  C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 for T3)
-     *
-     * @throws RuntimeException if this method is called using a base token which does not belong to this replica
-     */
-    public static InetAddress getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
-    {
-        AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
-
-        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
-        List<InetAddress> localBaseEndpoints = new ArrayList<>();
-        List<InetAddress> localViewEndpoints = new ArrayList<>();
-        for (InetAddress baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken))
-        {
-            if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter))
-                localBaseEndpoints.add(baseEndpoint);
-        }
-
-        for (InetAddress viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken))
-        {
-            // If we are a base endpoint which is also a view replica, we use ourselves as our view replica
-            if (viewEndpoint.equals(FBUtilities.getBroadcastAddress()))
-                return viewEndpoint;
-
-            // We have to remove any endpoint which is shared between the base and the view, as it will select itself
-            // and throw off the counts otherwise.
-            if (localBaseEndpoints.contains(viewEndpoint))
-                localBaseEndpoints.remove(viewEndpoint);
-            else if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter))
-                localViewEndpoints.add(viewEndpoint);
-        }
-
-        // The replication strategy will be the same for the base and the view, as they must belong to the same keyspace.
-        // Since the same replication strategy is used, the same placement should be used and we should get the same
-        // number of replicas for all of the tokens in the ring.
-        assert localBaseEndpoints.size() == localViewEndpoints.size() : "Replication strategy should have the same number of endpoints for the base and the view";
-        int baseIdx = localBaseEndpoints.indexOf(FBUtilities.getBroadcastAddress());
-
-        if (baseIdx < 0)
-        {
-
-            if (StorageService.instance.getTokenMetadata().pendingEndpointsFor(viewToken, keyspaceName).size() > 0)
-            {
-                //Since there are pending endpoints we are going to store hints this in the batchlog regardless.
-                //So we can pretend we are the views endpoint.
-
-                return FBUtilities.getBroadcastAddress();
-            }
-
-            throw new RuntimeException("Trying to get the view natural endpoint on a non-data replica");
-        }
-
-
-        return localViewEndpoints.get(baseIdx);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
new file mode 100644
index 0000000..7bcb592
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.view;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder;
+import org.apache.cassandra.db.CBuilder;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.partitions.AbstractBTreePartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.service.pager.QueryPager;
+
+/**
+ * A View copies data from a base table into a view table which can be queried independently from the
+ * base. Every update which targets the base table must be fed through the {@link ViewManager} to ensure
+ * that if a view needs to be updated, the updates are properly created and fed into the view.
+ *
+ * This class does the job of translating the base row to the view row.
+ *
+ * It handles reading existing state and figuring out what tombstones need to be generated.
+ *
+ * {@link View#createMutations(AbstractBTreePartition, TemporalRow.Set, boolean)} is the "main method"
+ *
+ */
+public class View
+{
+    /**
+     * The columns should all be updated together, so we use this object as group.
+     */
+    private static class Columns
+    {
+        //These are the base column definitions in terms of the *views* partitioning.
+        //Meaning we can see (for example) the partition key of the view contains a clustering key
+        //from the base table.
+        public final List<ColumnDefinition> partitionDefs;
+        public final List<ColumnDefinition> primaryKeyDefs;
+        public final List<ColumnDefinition> baseComplexColumns;
+
+        private Columns(List<ColumnDefinition> partitionDefs, List<ColumnDefinition> primaryKeyDefs, List<ColumnDefinition> baseComplexColumns)
+        {
+            this.partitionDefs = partitionDefs;
+            this.primaryKeyDefs = primaryKeyDefs;
+            this.baseComplexColumns = baseComplexColumns;
+        }
+    }
+
+    public final String name;
+    private volatile ViewDefinition definition;
+
+    private final ColumnFamilyStore baseCfs;
+
+    private Columns columns;
+
+    private final boolean viewHasAllPrimaryKeys;
+    private final boolean includeAllColumns;
+    private ViewBuilder builder;
+
+    public View(ViewDefinition definition,
+                ColumnFamilyStore baseCfs)
+    {
+        this.baseCfs = baseCfs;
+
+        name = definition.viewName;
+        includeAllColumns = definition.includeAllColumns;
+
+        viewHasAllPrimaryKeys = updateDefinition(definition);
+    }
+
+    public ViewDefinition getDefinition()
+    {
+        return definition;
+    }
+
+    /**
+     * Lookup column definitions in the base table that correspond to the view columns (should be 1:1)
+     *
+     * Notify caller if all primary keys in the view are ALL primary keys in the base. We do this to simplify
+     * tombstone checks.
+     *
+     * @param columns a list of columns to lookup in the base table
+     * @param definitions lists to populate for the base table definitions
+     * @return true if all view PKs are also Base PKs
+     */
+    private boolean resolveAndAddColumns(Iterable<ColumnIdentifier> columns, List<ColumnDefinition>... definitions)
+    {
+        boolean allArePrimaryKeys = true;
+        for (ColumnIdentifier identifier : columns)
+        {
+            ColumnDefinition cdef = baseCfs.metadata.getColumnDefinition(identifier);
+            assert cdef != null : "Could not resolve column " + identifier.toString();
+
+            for (List<ColumnDefinition> list : definitions)
+            {
+                list.add(cdef);
+            }
+
+            allArePrimaryKeys = allArePrimaryKeys && cdef.isPrimaryKeyColumn();
+        }
+
+        return allArePrimaryKeys;
+    }
+
+    /**
+     * This updates the columns stored which are dependent on the base CFMetaData.
+     *
+     * @return true if the view contains only columns which are part of the base's primary key; false if there is at
+     *         least one column which is not.
+     */
+    public boolean updateDefinition(ViewDefinition definition)
+    {
+        this.definition = definition;
+
+        CFMetaData viewCfm = definition.metadata;
+        List<ColumnDefinition> partitionDefs = new ArrayList<>(viewCfm.partitionKeyColumns().size());
+        List<ColumnDefinition> primaryKeyDefs = new ArrayList<>(viewCfm.partitionKeyColumns().size()
+                                                                + viewCfm.clusteringColumns().size());
+        List<ColumnDefinition> baseComplexColumns = new ArrayList<>();
+
+        // We only add the partition columns to the partitions list, but both partition columns and clustering
+        // columns are added to the primary keys list
+        boolean partitionAllPrimaryKeyColumns = resolveAndAddColumns(Iterables.transform(viewCfm.partitionKeyColumns(), cd -> cd.name), primaryKeyDefs, partitionDefs);
+        boolean clusteringAllPrimaryKeyColumns = resolveAndAddColumns(Iterables.transform(viewCfm.clusteringColumns(), cd -> cd.name), primaryKeyDefs);
+
+        for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
+        {
+            if (cdef.isComplex())
+            {
+                baseComplexColumns.add(cdef);
+            }
+        }
+
+        this.columns = new Columns(partitionDefs, primaryKeyDefs, baseComplexColumns);
+
+        return partitionAllPrimaryKeyColumns && clusteringAllPrimaryKeyColumns;
+    }
+
+    /**
+     * Check to see if the update could possibly modify a view. Cases where the view may be updated are:
+     * <ul>
+     *     <li>View selects all columns</li>
+     *     <li>Update contains any range tombstones</li>
+     *     <li>Update touches one of the columns included in the view</li>
+     * </ul>
+     *
+     * If the update contains any range tombstones, there is a possibility that it will not touch a range that is
+     * currently included in the view.
+     *
+     * @return true if {@param partition} modifies a column included in the view
+     */
+    public boolean updateAffectsView(AbstractBTreePartition partition)
+    {
+        // If we are including all of the columns, then any update will be included
+        if (includeAllColumns)
+            return true;
+
+        // If there are range tombstones, tombstones will also need to be generated for the view
+        // This requires a query of the base rows and generating tombstones for all of those values
+        if (!partition.deletionInfo().isLive())
+            return true;
+
+        // Check each row for deletion or update
+        for (Row row : partition)
+        {
+            if (row.hasComplexDeletion())
+                return true;
+            if (!row.deletion().isLive())
+                return true;
+
+            for (ColumnData data : row)
+            {
+                if (definition.metadata.getColumnDefinition(data.column().name) != null)
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Creates the clustering columns for the view based on the specified row and resolver policy
+     *
+     * @param temporalRow The current row
+     * @param resolver The policy to use when selecting versions of cells use
+     * @return The clustering object to use for the view
+     */
+    private Clustering viewClustering(TemporalRow temporalRow, TemporalRow.Resolver resolver)
+    {
+        CFMetaData viewCfm = definition.metadata;
+        int numViewClustering = viewCfm.clusteringColumns().size();
+        CBuilder clustering = CBuilder.create(viewCfm.comparator);
+        for (int i = 0; i < numViewClustering; i++)
+        {
+            ColumnDefinition definition = viewCfm.clusteringColumns().get(i);
+            clustering.add(temporalRow.clusteringValue(definition, resolver));
+        }
+
+        return clustering.build();
+    }
+
+    /**
+     * @return Mutation containing a range tombstone for a base partition key and TemporalRow.
+     */
+    private PartitionUpdate createTombstone(TemporalRow temporalRow,
+                                            DecoratedKey partitionKey,
+                                            Row.Deletion deletion,
+                                            TemporalRow.Resolver resolver,
+                                            int nowInSec)
+    {
+        CFMetaData viewCfm = definition.metadata;
+        Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+        builder.newRow(viewClustering(temporalRow, resolver));
+        builder.addRowDeletion(deletion);
+        return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
+    }
+
+    /**
+     * @return PartitionUpdate containing a complex tombstone for a TemporalRow, and the collection's column identifier.
+     */
+    private PartitionUpdate createComplexTombstone(TemporalRow temporalRow,
+                                                   DecoratedKey partitionKey,
+                                                   ColumnDefinition deletedColumn,
+                                                   DeletionTime deletionTime,
+                                                   TemporalRow.Resolver resolver,
+                                                   int nowInSec)
+    {
+        CFMetaData viewCfm = definition.metadata;
+        Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+        builder.newRow(viewClustering(temporalRow, resolver));
+        builder.addComplexDeletion(deletedColumn, deletionTime);
+        return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
+    }
+
+    /**
+     * @return View's DecoratedKey or null, if one of the view's primary key components has an invalid resolution from
+     *         the TemporalRow and its Resolver
+     */
+    private DecoratedKey viewPartitionKey(TemporalRow temporalRow, TemporalRow.Resolver resolver)
+    {
+        List<ColumnDefinition> partitionDefs = this.columns.partitionDefs;
+        Object[] partitionKey = new Object[partitionDefs.size()];
+
+        for (int i = 0; i < partitionKey.length; i++)
+        {
+            ByteBuffer value = temporalRow.clusteringValue(partitionDefs.get(i), resolver);
+
+            if (value == null)
+                return null;
+
+            partitionKey[i] = value;
+        }
+
+        CFMetaData metadata = definition.metadata;
+        return metadata.decorateKey(CFMetaData.serializePartitionKey(metadata
+                                                                     .getKeyValidatorAsClusteringComparator()
+                                                                     .make(partitionKey)));
+    }
+
+    /**
+     * @return mutation which contains the tombstone for the referenced TemporalRow, or null if not necessary.
+     * TemporalRow's can reference at most one view row; there will be at most one row to be tombstoned, so only one
+     * mutation is necessary
+     */
+    private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow)
+    {
+        // Primary Key and Clustering columns do not generate tombstones
+        if (viewHasAllPrimaryKeys)
+            return null;
+
+        boolean hasUpdate = false;
+        List<ColumnDefinition> primaryKeyDefs = this.columns.primaryKeyDefs;
+        for (ColumnDefinition viewPartitionKeys : primaryKeyDefs)
+        {
+            if (!viewPartitionKeys.isPrimaryKeyColumn() && temporalRow.clusteringValue(viewPartitionKeys, TemporalRow.oldValueIfUpdated) != null)
+                hasUpdate = true;
+        }
+
+        if (!hasUpdate)
+            return null;
+
+        TemporalRow.Resolver resolver = TemporalRow.earliest;
+        return createTombstone(temporalRow,
+                               viewPartitionKey(temporalRow, resolver),
+                               Row.Deletion.shadowable(new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec)),
+                               resolver,
+                               temporalRow.nowInSec);
+    }
+
+    /**
+     * @return Mutation which is the transformed base table mutation for the view.
+     */
+    private PartitionUpdate createUpdatesForInserts(TemporalRow temporalRow)
+    {
+        TemporalRow.Resolver resolver = TemporalRow.latest;
+
+        DecoratedKey partitionKey = viewPartitionKey(temporalRow, resolver);
+        CFMetaData viewCfm = definition.metadata;
+
+        if (partitionKey == null)
+        {
+            // Not having a partition key means we aren't updating anything
+            return null;
+        }
+
+        Row.Builder regularBuilder = BTreeRow.unsortedBuilder(temporalRow.nowInSec);
+
+        CBuilder clustering = CBuilder.create(viewCfm.comparator);
+        for (int i = 0; i < viewCfm.clusteringColumns().size(); i++)
+        {
+            clustering.add(temporalRow.clusteringValue(viewCfm.clusteringColumns().get(i), resolver));
+        }
+        regularBuilder.newRow(clustering.build());
+        regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(viewCfm,
+                                                                     temporalRow.viewClusteringTimestamp(),
+                                                                     temporalRow.viewClusteringTtl(),
+                                                                     temporalRow.viewClusteringLocalDeletionTime()));
+
+        for (ColumnDefinition columnDefinition : viewCfm.allColumns())
+        {
+            if (columnDefinition.isPrimaryKeyColumn())
+                continue;
+
+            for (Cell cell : temporalRow.values(columnDefinition, resolver))
+            {
+                regularBuilder.addCell(cell);
+            }
+        }
+
+        return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, regularBuilder.build());
+    }
+
+    /**
+     * @param partition Update which possibly contains deletion info for which to generate view tombstones.
+     * @return    View Tombstones which delete all of the rows which have been removed from the base table with
+     *            {@param partition}
+     */
+    private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractBTreePartition partition)
+    {
+        final TemporalRow.Resolver resolver = TemporalRow.earliest;
+
+        DeletionInfo deletionInfo = partition.deletionInfo();
+
+        List<Mutation> mutations = new ArrayList<>();
+
+        // Check the complex columns to see if there are any which may have tombstones we need to create for the view
+        if (!columns.baseComplexColumns.isEmpty())
+        {
+            for (Row row : partition)
+            {
+                if (!row.hasComplexDeletion())
+                    continue;
+
+                TemporalRow temporalRow = rowSet.getClustering(row.clustering());
+
+                assert temporalRow != null;
+
+                for (ColumnDefinition definition : columns.baseComplexColumns)
+                {
+                    ComplexColumnData columnData = row.getComplexColumnData(definition);
+
+                    if (columnData != null)
+                    {
+                        DeletionTime time = columnData.complexDeletion();
+                        if (!time.isLive())
+                        {
+                            DecoratedKey targetKey = viewPartitionKey(temporalRow, resolver);
+                            if (targetKey != null)
+                                mutations.add(new Mutation(createComplexTombstone(temporalRow, targetKey, definition, time, resolver, temporalRow.nowInSec)));
+                        }
+                    }
+                }
+            }
+        }
+
+        ReadCommand command = null;
+
+        if (!deletionInfo.isLive())
+        {
+            // We have to generate tombstones for all of the affected rows, but we don't have the information in order
+            // to create them. This requires that we perform a read for the entire range that is being tombstoned, and
+            // generate a tombstone for each. This may be slow, because a single range tombstone can cover up to an
+            // entire partition of data which is not distributed on a single partition node.
+            DecoratedKey dk = rowSet.dk;
+
+            if (deletionInfo.hasRanges())
+            {
+                SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, dk);
+                Iterator<RangeTombstone> tombstones = deletionInfo.rangeIterator(false);
+                while (tombstones.hasNext())
+                {
+                    RangeTombstone tombstone = tombstones.next();
+
+                    builder.addSlice(tombstone.deletedSlice());
+                }
+
+                command = builder.build();
+            }
+            else
+            {
+                command = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, rowSet.nowInSec, dk);
+            }
+        }
+
+        if (command == null)
+        {
+            SinglePartitionSliceBuilder builder = null;
+            for (Row row : partition)
+            {
+                if (!row.deletion().isLive())
+                {
+                    if (builder == null)
+                        builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
+                    builder.addSlice(Slice.make(row.clustering()));
+                }
+            }
+
+            if (builder != null)
+                command = builder.build();
+        }
+
+        if (command != null)
+        {
+
+            //We may have already done this work for
+            //another MV update so check
+
+            if (!rowSet.hasTombstonedExisting())
+            {
+                QueryPager pager = command.getPager(null);
+
+                // Add all of the rows which were recovered from the query to the row set
+                while (!pager.isExhausted())
+                {
+                    try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+                         PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
+                    {
+                        if (!iter.hasNext())
+                            break;
+
+                        try (RowIterator rowIterator = iter.next())
+                        {
+                            while (rowIterator.hasNext())
+                            {
+                                Row row = rowIterator.next();
+                                rowSet.addRow(row, false);
+                            }
+                        }
+                    }
+                }
+
+                //Incase we fetched nothing, avoid re checking on another MV update
+                rowSet.setTombstonedExisting();
+            }
+
+            // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone
+            // for the view.
+            for (TemporalRow temporalRow : rowSet)
+            {
+                DeletionTime deletionTime = temporalRow.deletionTime(partition);
+                if (!deletionTime.isLive())
+                {
+                    DecoratedKey value = viewPartitionKey(temporalRow, resolver);
+                    if (value != null)
+                    {
+                        PartitionUpdate update = createTombstone(temporalRow, value, Row.Deletion.regular(deletionTime), resolver, temporalRow.nowInSec);
+                        if (update != null)
+                            mutations.add(new Mutation(update));
+                    }
+                }
+            }
+        }
+
+        return !mutations.isEmpty() ? mutations : null;
+    }
+
+    /**
+     * Read and update temporal rows in the set which have corresponding values stored on the local node
+     */
+    private void readLocalRows(TemporalRow.Set rowSet)
+    {
+        SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
+
+        for (TemporalRow temporalRow : rowSet)
+            builder.addSlice(temporalRow.baseSlice());
+
+        QueryPager pager = builder.build().getPager(null);
+
+        while (!pager.isExhausted())
+        {
+            try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+                 PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
+            {
+                while (iter.hasNext())
+                {
+                    try (RowIterator rows = iter.next())
+                    {
+                        while (rows.hasNext())
+                        {
+                            rowSet.addRow(rows.next(), false);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @return Set of rows which are contained in the partition update {@param partition}
+     */
+    private TemporalRow.Set separateRows(AbstractBTreePartition partition, Set<ColumnIdentifier> viewPrimaryKeyCols)
+    {
+
+        TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, viewPrimaryKeyCols, partition.partitionKey().getKey());
+
+        for (Row row : partition)
+            rowSet.addRow(row, true);
+
+        return rowSet;
+    }
+
+    /**
+     * Splits the partition update up and adds the existing state to each row.
+     * This data can be reused for multiple MV updates on the same base table
+     *
+     * @param partition the mutation
+     * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
+     *                   since all of the update will already be present in the base table.
+     * @return The set of temoral rows contained in this update
+     */
+    public TemporalRow.Set getTemporalRowSet(AbstractBTreePartition partition, TemporalRow.Set existing, boolean isBuilding)
+    {
+        if (!updateAffectsView(partition))
+            return null;
+
+        Set<ColumnIdentifier> columns = new HashSet<>(this.columns.primaryKeyDefs.size());
+        for (ColumnDefinition def : this.columns.primaryKeyDefs)
+            columns.add(def.name);
+
+        TemporalRow.Set rowSet;
+        if (existing == null)
+        {
+            rowSet = separateRows(partition, columns);
+
+            // If we are building the view, we do not want to add old values; they will always be the same
+            if (!isBuilding)
+                readLocalRows(rowSet);
+        }
+        else
+        {
+            rowSet = existing.withNewViewPrimaryKey(columns);
+        }
+
+        return rowSet;
+    }
+
+
+    /**
+     * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
+     *                   since all of the update will already be present in the base table.
+     * @return View mutations which represent the changes necessary as long as previously created mutations for the view
+     *         have been applied successfully. This is based solely on the changes that are necessary given the current
+     *         state of the base table and the newly applying partition data.
+     */
+    public Collection<Mutation> createMutations(AbstractBTreePartition partition, TemporalRow.Set rowSet, boolean isBuilding)
+    {
+        if (!updateAffectsView(partition))
+            return null;
+
+        Collection<Mutation> mutations = null;
+        for (TemporalRow temporalRow : rowSet)
+        {
+            // If we are building, there is no need to check for partition tombstones; those values will not be present
+            // in the partition data
+            if (!isBuilding)
+            {
+                PartitionUpdate partitionTombstone = createRangeTombstoneForRow(temporalRow);
+                if (partitionTombstone != null)
+                {
+                    if (mutations == null) mutations = new LinkedList<>();
+                    mutations.add(new Mutation(partitionTombstone));
+                }
+            }
+
+            PartitionUpdate insert = createUpdatesForInserts(temporalRow);
+            if (insert != null)
+            {
+                if (mutations == null) mutations = new LinkedList<>();
+                mutations.add(new Mutation(insert));
+            }
+        }
+
+        if (!isBuilding)
+        {
+            Collection<Mutation> deletion = createForDeletionInfo(rowSet, partition);
+            if (deletion != null && !deletion.isEmpty())
+            {
+                if (mutations == null) mutations = new LinkedList<>();
+                mutations.addAll(deletion);
+            }
+        }
+
+        return mutations;
+    }
+
+    public synchronized void build()
+    {
+        if (this.builder != null)
+        {
+            this.builder.stop();
+            this.builder = null;
+        }
+
+        this.builder = new ViewBuilder(baseCfs, this);
+        CompactionManager.instance.submitViewBuilder(builder);
+    }
+
+    @Nullable
+    public static CFMetaData findBaseTable(String keyspace, String viewName)
+    {
+        ViewDefinition view = Schema.instance.getView(keyspace, viewName);
+        return (view == null) ? null : Schema.instance.getCFMetaData(view.baseTableId);
+    }
+
+    public static Iterable<ViewDefinition> findAll(String keyspace, String baseTable)
+    {
+        KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+        final UUID baseId = Schema.instance.getId(keyspace, baseTable);
+        return Iterables.filter(ksm.views, view -> view.baseTableId.equals(baseId));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
new file mode 100644
index 0000000..62aa332
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+public class ViewBuilder extends CompactionInfo.Holder
+{
+    private final ColumnFamilyStore baseCfs;
+    private final View view;
+    private final UUID compactionId;
+    private volatile Token prevToken = null;
+
+    private static final Logger logger = LoggerFactory.getLogger(ViewBuilder.class);
+
+    private volatile boolean isStopped = false;
+
+    public ViewBuilder(ColumnFamilyStore baseCfs, View view)
+    {
+        this.baseCfs = baseCfs;
+        this.view = view;
+        compactionId = UUIDGen.getTimeUUID();
+    }
+
+    private void buildKey(DecoratedKey key)
+    {
+        QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null);
+
+        while (!pager.isExhausted())
+        {
+           try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+                PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup))
+           {
+               if (!partitionIterator.hasNext())
+                   return;
+
+               try (RowIterator rowIterator = partitionIterator.next())
+               {
+                   FilteredPartition partition = FilteredPartition.create(rowIterator);
+                   TemporalRow.Set temporalRows = view.getTemporalRowSet(partition, null, true);
+
+                   Collection<Mutation> mutations = view.createMutations(partition, temporalRows, true);
+
+                   if (mutations != null)
+                       StorageProxy.mutateMV(key.getKey(), mutations, true);
+               }
+           }
+        }
+    }
+
+    public void run()
+    {
+        String ksname = baseCfs.metadata.ksName, viewName = view.name;
+
+        if (SystemKeyspace.isViewBuilt(ksname, viewName))
+            return;
+
+        Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+        final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName);
+        Token lastToken;
+        Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
+        if (buildStatus == null)
+        {
+            baseCfs.forceBlockingFlush();
+            function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL);
+            int generation = Integer.MIN_VALUE;
+
+            try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
+            {
+                for (SSTableReader reader : temp)
+                {
+                    generation = Math.max(reader.descriptor.generation, generation);
+                }
+            }
+
+            SystemKeyspace.beginViewBuild(ksname, viewName, generation);
+            lastToken = null;
+        }
+        else
+        {
+            function = new Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>>()
+            {
+                @Nullable
+                public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view)
+                {
+                    Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL).apply(view);
+                    if (readers != null)
+                        return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
+                    return null;
+                }
+            };
+            lastToken = buildStatus.right;
+        }
+
+        prevToken = lastToken;
+        try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
+             ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
+        {
+            while (!isStopped && iter.hasNext())
+            {
+                DecoratedKey key = iter.next();
+                Token token = key.getToken();
+                if (lastToken == null || lastToken.compareTo(token) < 0)
+                {
+                    for (Range<Token> range : ranges)
+                    {
+                        if (range.contains(token))
+                        {
+                            buildKey(key);
+
+                            if (prevToken == null || prevToken.compareTo(token) != 0)
+                            {
+                                SystemKeyspace.updateViewBuildStatus(ksname, viewName, key.getToken());
+                                prevToken = token;
+                            }
+                        }
+                    }
+                    lastToken = null;
+                }
+            }
+
+            if (!isStopped)
+            SystemKeyspace.finishViewBuildStatus(ksname, viewName);
+
+        }
+        catch (Exception e)
+        {
+            final ViewBuilder builder = new ViewBuilder(baseCfs, view);
+            ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(builder),
+                                                         5,
+                                                         TimeUnit.MINUTES);
+            logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
+        }
+    }
+
+    public CompactionInfo getCompactionInfo()
+    {
+        long rangesLeft = 0, rangesTotal = 0;
+        Token lastToken = prevToken;
+
+        // This approximation is not very accurate, but since we do not have a method which allows us to calculate the
+        // percentage of a range covered by a second range, this is the best approximation that we can calculate.
+        // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of
+        // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node
+        // has.
+        for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName()))
+        {
+            rangesLeft++;
+            rangesTotal++;
+            // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the
+            // end of the method.
+            if (lastToken == null || range.contains(lastToken))
+                rangesLeft = 0;
+        }
+        return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
+    }
+
+    public void stop()
+    {
+        isStopped = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java
new file mode 100644
index 0000000..2364ed1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.view;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.Lock;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Striped;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Manages {@link View}'s for a single {@link ColumnFamilyStore}. All of the views for that table are created when this
+ * manager is initialized.
+ *
+ * The main purposes of the manager are to provide a single location for updates to be vetted to see whether they update
+ * any views {@link ViewManager#updatesAffectView(Collection, boolean)}, provide locks to prevent multiple
+ * updates from creating incoherent updates in the view {@link ViewManager#acquireLockFor(ByteBuffer)}, and
+ * to affect change on the view.
+ */
+public class ViewManager
+{
+    public class ForStore
+    {
+        private final ConcurrentNavigableMap<String, View> viewsByName;
+
+        public ForStore()
+        {
+            this.viewsByName = new ConcurrentSkipListMap<>();
+        }
+
+        public Iterable<View> allViews()
+        {
+            return viewsByName.values();
+        }
+
+        public Iterable<ColumnFamilyStore> allViewsCfs()
+        {
+            List<ColumnFamilyStore> viewColumnFamilies = new ArrayList<>();
+            for (View view : allViews())
+                viewColumnFamilies.add(keyspace.getColumnFamilyStore(view.getDefinition().viewName));
+            return viewColumnFamilies;
+        }
+
+        public void forceBlockingFlush()
+        {
+            for (ColumnFamilyStore viewCfs : allViewsCfs())
+                viewCfs.forceBlockingFlush();
+        }
+
+        public void dumpMemtables()
+        {
+            for (ColumnFamilyStore viewCfs : allViewsCfs())
+                viewCfs.dumpMemtable();
+        }
+
+        public void truncateBlocking(long truncatedAt)
+        {
+            for (ColumnFamilyStore viewCfs : allViewsCfs())
+            {
+                ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
+                SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter);
+            }
+        }
+
+        public void addView(View view)
+        {
+            viewsByName.put(view.name, view);
+        }
+
+        public void removeView(String name)
+        {
+            viewsByName.remove(name);
+        }
+    }
+
+    private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentViewWriters() * 1024);
+
+    private static final boolean enableCoordinatorBatchlog = Boolean.getBoolean("cassandra.mv_enable_coordinator_batchlog");
+
+    private final ConcurrentNavigableMap<UUID, ForStore> viewManagersByStore;
+    private final ConcurrentNavigableMap<String, View> viewsByName;
+    private final Keyspace keyspace;
+
+    public ViewManager(Keyspace keyspace)
+    {
+        this.viewManagersByStore = new ConcurrentSkipListMap<>();
+        this.viewsByName = new ConcurrentSkipListMap<>();
+        this.keyspace = keyspace;
+    }
+
+    /**
+     * Calculates and pushes updates to the views replicas. The replicas are determined by
+     * {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
+     */
+    public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog)
+    {
+        List<Mutation> mutations = null;
+        TemporalRow.Set temporalRows = null;
+        for (Map.Entry<String, View> view : viewsByName.entrySet())
+        {
+            temporalRows = view.getValue().getTemporalRowSet(update, temporalRows, false);
+
+            Collection<Mutation> viewMutations = view.getValue().createMutations(update, temporalRows, false);
+            if (viewMutations != null && !viewMutations.isEmpty())
+            {
+                if (mutations == null)
+                    mutations = Lists.newLinkedList();
+                mutations.addAll(viewMutations);
+            }
+        }
+
+        if (mutations != null)
+            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog);
+    }
+
+    public boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean coordinatorBatchlog)
+    {
+        if (coordinatorBatchlog && !enableCoordinatorBatchlog)
+            return false;
+
+        for (IMutation mutation : mutations)
+        {
+            for (PartitionUpdate cf : mutation.getPartitionUpdates())
+            {
+                assert keyspace.getName().equals(cf.metadata().ksName);
+
+                if (coordinatorBatchlog && keyspace.getReplicationStrategy().getReplicationFactor() == 1)
+                    continue;
+
+                for (View view : allViews())
+                {
+                    if (!cf.metadata().cfId.equals(view.getDefinition().baseTableId))
+                        continue;
+
+                    if (view.updateAffectsView(cf))
+                        return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+    public Iterable<View> allViews()
+    {
+        return viewsByName.values();
+    }
+
+    public void update(String viewName)
+    {
+        View view = viewsByName.get(viewName);
+        assert view != null : "When updating a view, it should already be in the ViewManager";
+        view.build();
+
+        // We provide the new definition from the base metadata
+        Optional<ViewDefinition> viewDefinition = keyspace.getMetadata().views.get(viewName);
+        assert viewDefinition.isPresent() : "When updating a view, it should still be in the Keyspaces views";
+        view.updateDefinition(viewDefinition.get());
+    }
+
+    public void reload()
+    {
+        Map<String, ViewDefinition> newViewsByName = new HashMap<>();
+        for (ViewDefinition definition : keyspace.getMetadata().views)
+        {
+            newViewsByName.put(definition.viewName, definition);
+        }
+
+        for (String viewName : viewsByName.keySet())
+        {
+            if (!newViewsByName.containsKey(viewName))
+                removeView(viewName);
+        }
+
+        for (Map.Entry<String, ViewDefinition> entry : newViewsByName.entrySet())
+        {
+            if (!viewsByName.containsKey(entry.getKey()))
+                addView(entry.getValue());
+        }
+
+        for (View view : allViews())
+        {
+            view.build();
+            // We provide the new definition from the base metadata
+            view.updateDefinition(newViewsByName.get(view.name));
+        }
+    }
+
+    public void addView(ViewDefinition definition)
+    {
+        View view = new View(definition, keyspace.getColumnFamilyStore(definition.baseTableId));
+        forTable(view.getDefinition().baseTableId).addView(view);
+        viewsByName.put(definition.viewName, view);
+    }
+
+    public void removeView(String name)
+    {
+        View view = viewsByName.remove(name);
+
+        if (view == null)
+            return;
+
+        forTable(view.getDefinition().baseTableId).removeView(name);
+        SystemKeyspace.setViewRemoved(keyspace.getName(), view.name);
+    }
+
+    public void buildAllViews()
+    {
+        for (View view : allViews())
+            view.build();
+    }
+
+    public ForStore forTable(UUID baseId)
+    {
+        ForStore forStore = viewManagersByStore.get(baseId);
+        if (forStore == null)
+        {
+            forStore = new ForStore();
+            ForStore previous = viewManagersByStore.put(baseId, forStore);
+            if (previous != null)
+                forStore = previous;
+        }
+        return forStore;
+    }
+
+    public static Lock acquireLockFor(ByteBuffer key)
+    {
+        Lock lock = LOCKS.get(key);
+
+        if (lock.tryLock())
+            return lock;
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/ViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java
new file mode 100644
index 0000000..628142d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public final class ViewUtils
+{
+    private ViewUtils()
+    {
+    }
+
+    /**
+     * Calculate the natural endpoint for the view.
+     *
+     * The view natural endpoint is the endpint which has the same cardinality as this node in the replication factor.
+     * The cardinality is the number at which this node would store a piece of data, given the change in replication
+     * factor.
+     *
+     * For example, if we have the following ring:
+     *   A, T1 -> B, T2 -> C, T3 -> A
+     *
+     * For the token T1, at RF=1, A would be included, so A's cardinality for T1 is 1. For the token T1, at RF=2, B would
+     * be included, so B's cardinality for token T1 is 2. For token T3, at RF = 2, A would be included, so A's cardinality
+     * for T3 is 2.
+     *
+     * For a view whose base token is T1 and whose view token is T3, the pairings between the nodes would be:
+     *  A writes to C (A's cardinality is 1 for T1, and C's cardinality is 1 for T3)
+     *  B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 for T3)
+     *  C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 for T3)
+     *
+     * @throws RuntimeException if this method is called using a base token which does not belong to this replica
+     */
+    public static InetAddress getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
+    {
+        AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
+
+        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+        List<InetAddress> localBaseEndpoints = new ArrayList<>();
+        List<InetAddress> localViewEndpoints = new ArrayList<>();
+        for (InetAddress baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken))
+        {
+            if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter))
+                localBaseEndpoints.add(baseEndpoint);
+        }
+
+        for (InetAddress viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken))
+        {
+            // If we are a base endpoint which is also a view replica, we use ourselves as our view replica
+            if (viewEndpoint.equals(FBUtilities.getBroadcastAddress()))
+                return viewEndpoint;
+
+            // We have to remove any endpoint which is shared between the base and the view, as it will select itself
+            // and throw off the counts otherwise.
+            if (localBaseEndpoints.contains(viewEndpoint))
+                localBaseEndpoints.remove(viewEndpoint);
+            else if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter))
+                localViewEndpoints.add(viewEndpoint);
+        }
+
+        // The replication strategy will be the same for the base and the view, as they must belong to the same keyspace.
+        // Since the same replication strategy is used, the same placement should be used and we should get the same
+        // number of replicas for all of the tokens in the ring.
+        assert localBaseEndpoints.size() == localViewEndpoints.size() : "Replication strategy should have the same number of endpoints for the base and the view";
+        int baseIdx = localBaseEndpoints.indexOf(FBUtilities.getBroadcastAddress());
+
+        if (baseIdx < 0)
+        {
+
+            if (StorageService.instance.getTokenMetadata().pendingEndpointsFor(viewToken, keyspaceName).size() > 0)
+            {
+                //Since there are pending endpoints we are going to store hints this in the batchlog regardless.
+                //So we can pretend we are the views endpoint.
+
+                return FBUtilities.getBroadcastAddress();
+            }
+
+            throw new RuntimeException("Trying to get the view natural endpoint on a non-data replica");
+        }
+
+
+        return localViewEndpoints.get(baseIdx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index 46872c1..bbf6fd6 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -276,7 +276,7 @@ public class ByteOrderedPartitioner implements IPartitioner
 
         for (String ks : Schema.instance.getKeyspaces())
         {
-            for (CFMetaData cfmd : Schema.instance.getTables(ks))
+            for (CFMetaData cfmd : Schema.instance.getTablesAndViews(ks))
             {
                 for (Range<Token> r : sortedRanges)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index 464ac3d..96b4ca0 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -216,7 +216,7 @@ public class OrderPreservingPartitioner implements IPartitioner
 
         for (String ks : Schema.instance.getKeyspaces())
         {
-            for (CFMetaData cfmd : Schema.instance.getTables(ks))
+            for (CFMetaData cfmd : Schema.instance.getTablesAndViews(ks))
             {
                 for (Range<Token> r : sortedRanges)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java b/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
deleted file mode 100644
index 39a5574..0000000
--- a/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.metrics;
-
-import com.codahale.metrics.Counter;
-
-import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
-
-public class MVWriteMetrics extends ClientRequestMetrics
-{
-    public final Counter viewReplicasAttempted;
-    public final Counter viewReplicasSuccess;
-
-    public MVWriteMetrics(String scope) {
-        super(scope);
-        viewReplicasAttempted = Metrics.counter(factory.createMetricName("ViewReplicasAttempted"));
-        viewReplicasSuccess = Metrics.counter(factory.createMetricName("ViewReplicasSuccess"));
-    }
-
-    public void release()
-    {
-        super.release();
-        Metrics.remove(factory.createMetricName("ViewReplicasAttempted"));
-        Metrics.remove(factory.createMetricName("ViewReplicasSuccess"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java b/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
new file mode 100644
index 0000000..c99cc5c
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Counter;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class ViewWriteMetrics extends ClientRequestMetrics
+{
+    public final Counter viewReplicasAttempted;
+    public final Counter viewReplicasSuccess;
+
+    public ViewWriteMetrics(String scope) {
+        super(scope);
+        viewReplicasAttempted = Metrics.counter(factory.createMetricName("ViewReplicasAttempted"));
+        viewReplicasSuccess = Metrics.counter(factory.createMetricName("ViewReplicasSuccess"));
+    }
+
+    public void release()
+    {
+        super.release();
+        Metrics.remove(factory.createMetricName("ViewReplicasAttempted"));
+        Metrics.remove(factory.createMetricName("ViewReplicasSuccess"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index 372ff6e..0a0d9d8 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@ -22,9 +22,12 @@ import java.util.Optional;
 import java.util.Set;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
 
+import org.apache.avro.reflect.Nullable;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.ViewDefinition;
 import org.apache.cassandra.exceptions.ConfigurationException;
 
 /**
@@ -35,51 +38,72 @@ public final class KeyspaceMetadata
     public final String name;
     public final KeyspaceParams params;
     public final Tables tables;
+    public final Views views;
     public final Types types;
     public final Functions functions;
 
-    private KeyspaceMetadata(String name, KeyspaceParams params, Tables tables, Types types, Functions functions)
+    private KeyspaceMetadata(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
     {
         this.name = name;
         this.params = params;
         this.tables = tables;
+        this.views = views;
         this.types = types;
         this.functions = functions;
     }
 
     public static KeyspaceMetadata create(String name, KeyspaceParams params)
     {
-        return new KeyspaceMetadata(name, params, Tables.none(), Types.none(), Functions.none());
+        return new KeyspaceMetadata(name, params, Tables.none(), Views.none(), Types.none(), Functions.none());
     }
 
     public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables)
     {
-        return new KeyspaceMetadata(name, params, tables, Types.none(), Functions.none());
+        return new KeyspaceMetadata(name, params, tables, Views.none(), Types.none(), Functions.none());
     }
 
-    public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables, Types types, Functions functions)
+    public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
     {
-        return new KeyspaceMetadata(name, params, tables, types, functions);
+        return new KeyspaceMetadata(name, params, tables, views, types, functions);
     }
 
     public KeyspaceMetadata withSwapped(KeyspaceParams params)
     {
-        return new KeyspaceMetadata(name, params, tables, types, functions);
+        return new KeyspaceMetadata(name, params, tables, views, types, functions);
     }
 
-    public KeyspaceMetadata withSwapped(Tables tables)
+    public KeyspaceMetadata withSwapped(Tables regular)
     {
-        return new KeyspaceMetadata(name, params, tables, types, functions);
+        return new KeyspaceMetadata(name, params, regular, views, types, functions);
+    }
+
+    public KeyspaceMetadata withSwapped(Views views)
+    {
+        return new KeyspaceMetadata(name, params, tables, views, types, functions);
     }
 
     public KeyspaceMetadata withSwapped(Types types)
     {
-        return new KeyspaceMetadata(name, params, tables, types, functions);
+        return new KeyspaceMetadata(name, params, tables, views, types, functions);
     }
 
     public KeyspaceMetadata withSwapped(Functions functions)
     {
-        return new KeyspaceMetadata(name, params, tables, types, functions);
+        return new KeyspaceMetadata(name, params, tables, views, types, functions);
+    }
+
+    public Iterable<CFMetaData> tablesAndViews()
+    {
+        return Iterables.concat(tables, views.metadatas());
+    }
+
+    @Nullable
+    public CFMetaData getTableOrViewNullable(String tableOrViewName)
+    {
+        ViewDefinition view = views.getNullable(tableOrViewName);
+        return view == null
+             ? tables.getNullable(tableOrViewName)
+             : view.metadata;
     }
 
     public Set<String> existingIndexNames(String cfToExclude)
@@ -94,7 +118,7 @@ public final class KeyspaceMetadata
 
     public Optional<CFMetaData> findIndexedTable(String indexName)
     {
-        for (CFMetaData cfm : tables)
+        for (CFMetaData cfm : tablesAndViews())
             if (cfm.getIndexes().has(indexName))
                 return Optional.of(cfm);
 
@@ -104,7 +128,7 @@ public final class KeyspaceMetadata
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(name, params, tables, functions, types);
+        return Objects.hashCode(name, params, tables, views, functions, types);
     }
 
     @Override
@@ -121,6 +145,7 @@ public final class KeyspaceMetadata
         return name.equals(other.name)
             && params.equals(other.params)
             && tables.equals(other.tables)
+            && views.equals(other.views)
             && functions.equals(other.functions)
             && types.equals(other.types);
     }
@@ -132,6 +157,7 @@ public final class KeyspaceMetadata
                       .add("name", name)
                       .add("params", params)
                       .add("tables", tables)
+                      .add("views", views)
                       .add("functions", functions)
                       .add("types", types)
                       .toString();
@@ -144,6 +170,6 @@ public final class KeyspaceMetadata
                                                            + "or contain non-alphanumeric-underscore characters (got \"%s\")",
                                                            Schema.NAME_LENGTH,
                                                            name));
-        tables.forEach(CFMetaData::validate);
+        tablesAndViews().forEach(CFMetaData::validate);
     }
 }