You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/16 16:35:15 UTC
[4/6] cassandra git commit: Improve MV schema representation
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
deleted file mode 100644
index e23fd84..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.ReadOrderGroup;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.compaction.CompactionInfo;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.partitions.FilteredPartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.Refs;
-
-public class MaterializedViewBuilder extends CompactionInfo.Holder
-{
- private final ColumnFamilyStore baseCfs;
- private final MaterializedView view;
- private final UUID compactionId;
- private volatile Token prevToken = null;
-
- private static final Logger logger = LoggerFactory.getLogger(MaterializedViewBuilder.class);
-
- private volatile boolean isStopped = false;
-
- public MaterializedViewBuilder(ColumnFamilyStore baseCfs, MaterializedView view)
- {
- this.baseCfs = baseCfs;
- this.view = view;
- compactionId = UUIDGen.getTimeUUID();
- }
-
- private void buildKey(DecoratedKey key)
- {
- QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null);
-
- while (!pager.isExhausted())
- {
- try (ReadOrderGroup orderGroup = pager.startOrderGroup();
- PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup))
- {
- if (!partitionIterator.hasNext())
- return;
-
- try (RowIterator rowIterator = partitionIterator.next())
- {
- FilteredPartition partition = FilteredPartition.create(rowIterator);
- TemporalRow.Set temporalRows = view.getTemporalRowSet(partition, null, true);
-
- Collection<Mutation> mutations = view.createMutations(partition, temporalRows, true);
-
- if (mutations != null)
- StorageProxy.mutateMV(key.getKey(), mutations, true);
- }
- }
- }
- }
-
- public void run()
- {
- String ksname = baseCfs.metadata.ksName, viewName = view.name;
-
- if (SystemKeyspace.isViewBuilt(ksname, viewName))
- return;
-
- Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
- final Pair<Integer, Token> buildStatus = SystemKeyspace.getMaterializedViewBuildStatus(ksname, viewName);
- Token lastToken;
- Function<View, Iterable<SSTableReader>> function;
- if (buildStatus == null)
- {
- baseCfs.forceBlockingFlush();
- function = View.select(SSTableSet.CANONICAL);
- int generation = Integer.MIN_VALUE;
-
- try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
- {
- for (SSTableReader reader : temp)
- {
- generation = Math.max(reader.descriptor.generation, generation);
- }
- }
-
- SystemKeyspace.beginMaterializedViewBuild(ksname, viewName, generation);
- lastToken = null;
- }
- else
- {
- function = new Function<View, Iterable<SSTableReader>>()
- {
- @Nullable
- public Iterable<SSTableReader> apply(View view)
- {
- Iterable<SSTableReader> readers = View.select(SSTableSet.CANONICAL).apply(view);
- if (readers != null)
- return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
- return null;
- }
- };
- lastToken = buildStatus.right;
- }
-
- prevToken = lastToken;
- try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
- ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
- {
- while (!isStopped && iter.hasNext())
- {
- DecoratedKey key = iter.next();
- Token token = key.getToken();
- if (lastToken == null || lastToken.compareTo(token) < 0)
- {
- for (Range<Token> range : ranges)
- {
- if (range.contains(token))
- {
- buildKey(key);
-
- if (prevToken == null || prevToken.compareTo(token) != 0)
- {
- SystemKeyspace.updateMaterializedViewBuildStatus(ksname, viewName, key.getToken());
- prevToken = token;
- }
- }
- }
- lastToken = null;
- }
- }
-
- if (!isStopped)
- SystemKeyspace.finishMaterializedViewBuildStatus(ksname, viewName);
-
- }
- catch (Exception e)
- {
- final MaterializedViewBuilder builder = new MaterializedViewBuilder(baseCfs, view);
- ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitMaterializedViewBuilder(builder),
- 5,
- TimeUnit.MINUTES);
- logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
- }
- }
-
- public CompactionInfo getCompactionInfo()
- {
- long rangesLeft = 0, rangesTotal = 0;
- Token lastToken = prevToken;
-
- // This approximation is not very accurate, but since we do not have a method which allows us to calculate the
- // percentage of a range covered by a second range, this is the best approximation that we can calculate.
- // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of
- // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node
- // has.
- for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName()))
- {
- rangesLeft++;
- rangesTotal++;
- // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the
- // end of the method.
- if (lastToken == null || range.contains(lastToken))
- rangesLeft = 0;
- }
- return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
- }
-
- public void stop()
- {
- isStopped = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
deleted file mode 100644
index 41f4ed0..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.locks.Lock;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Striped;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.MaterializedViewDefinition;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-
-/**
- * Manages {@link MaterializedView}'s for a single {@link ColumnFamilyStore}. All of the materialized views for that
- * table are created when this manager is initialized.
- *
- * The main purposes of the manager are to provide a single location for updates to be vetted to see whether they update
- * any views {@link MaterializedViewManager#updateAffectsView(PartitionUpdate)}, provide locks to prevent multiple
- * updates from creating incoherent updates in the view {@link MaterializedViewManager#acquireLockFor(ByteBuffer)}, and
- * to affect change on the view.
- */
-public class MaterializedViewManager
-{
- private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentWriters() * 1024);
- private static final boolean enableCoordinatorBatchlog = Boolean.getBoolean("cassandra.mv_enable_coordinator_batchlog");
-
- private final ConcurrentNavigableMap<String, MaterializedView> viewsByName;
-
- private final ColumnFamilyStore baseCfs;
-
- public MaterializedViewManager(ColumnFamilyStore baseCfs)
- {
- this.viewsByName = new ConcurrentSkipListMap<>();
-
- this.baseCfs = baseCfs;
- }
-
- public Iterable<MaterializedView> allViews()
- {
- return viewsByName.values();
- }
-
- public Iterable<ColumnFamilyStore> allViewsCfs()
- {
- List<ColumnFamilyStore> viewColumnFamilies = new ArrayList<>();
- for (MaterializedView view : allViews())
- viewColumnFamilies.add(view.getViewCfs());
- return viewColumnFamilies;
- }
-
- public void init()
- {
- reload();
- }
-
- public void invalidate()
- {
- for (MaterializedView view : allViews())
- removeMaterializedView(view.name);
- }
-
- public void reload()
- {
- Map<String, MaterializedViewDefinition> newViewsByName = new HashMap<>();
- for (MaterializedViewDefinition definition : baseCfs.metadata.getMaterializedViews())
- {
- newViewsByName.put(definition.viewName, definition);
- }
-
- for (String viewName : viewsByName.keySet())
- {
- if (!newViewsByName.containsKey(viewName))
- removeMaterializedView(viewName);
- }
-
- for (Map.Entry<String, MaterializedViewDefinition> entry : newViewsByName.entrySet())
- {
- if (!viewsByName.containsKey(entry.getKey()))
- addMaterializedView(entry.getValue());
- }
-
- for (MaterializedView view : allViews())
- {
- view.build();
- // We provide the new definition from the base metadata
- view.updateDefinition(newViewsByName.get(view.name));
- }
- }
-
- public void buildAllViews()
- {
- for (MaterializedView view : allViews())
- view.build();
- }
-
- public void removeMaterializedView(String name)
- {
- MaterializedView view = viewsByName.remove(name);
-
- if (view == null)
- return;
-
- SystemKeyspace.setMaterializedViewRemoved(baseCfs.metadata.ksName, view.name);
- }
-
- public void addMaterializedView(MaterializedViewDefinition definition)
- {
- MaterializedView view = new MaterializedView(definition, baseCfs);
-
- viewsByName.put(definition.viewName, view);
- }
-
- /**
- * Calculates and pushes updates to the views replicas. The replicas are determined by
- * {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
- */
- public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog)
- {
- List<Mutation> mutations = null;
- TemporalRow.Set temporalRows = null;
- for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet())
- {
-
- temporalRows = view.getValue().getTemporalRowSet(update, temporalRows, false);
-
- Collection<Mutation> viewMutations = view.getValue().createMutations(update, temporalRows, false);
- if (viewMutations != null && !viewMutations.isEmpty())
- {
- if (mutations == null)
- mutations = Lists.newLinkedList();
- mutations.addAll(viewMutations);
- }
- }
- if (mutations != null)
- {
- StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog);
- }
- }
-
- public boolean updateAffectsView(PartitionUpdate upd)
- {
- for (MaterializedView view : allViews())
- {
- if (view.updateAffectsView(upd))
- return true;
- }
- return false;
- }
-
- public static Lock acquireLockFor(ByteBuffer key)
- {
- Lock lock = LOCKS.get(key);
-
- if (lock.tryLock())
- return lock;
-
- return null;
- }
-
- public static boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean coordinatorBatchlog)
- {
- if (coordinatorBatchlog && !enableCoordinatorBatchlog)
- return false;
-
- for (IMutation mutation : mutations)
- {
- for (PartitionUpdate cf : mutation.getPartitionUpdates())
- {
- Keyspace keyspace = Keyspace.open(cf.metadata().ksName);
-
- if (coordinatorBatchlog && keyspace.getReplicationStrategy().getReplicationFactor() == 1)
- continue;
-
- MaterializedViewManager viewManager = keyspace.getColumnFamilyStore(cf.metadata().cfId).materializedViewManager;
- if (viewManager.updateAffectsView(cf))
- return true;
- }
- }
-
- return false;
- }
-
-
- public void forceBlockingFlush()
- {
- for (ColumnFamilyStore viewCfs : allViewsCfs())
- viewCfs.forceBlockingFlush();
- }
-
- public void dumpMemtables()
- {
- for (ColumnFamilyStore viewCfs : allViewsCfs())
- viewCfs.dumpMemtable();
- }
-
- public void truncateBlocking(long truncatedAt)
- {
- for (ColumnFamilyStore viewCfs : allViewsCfs())
- {
- ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
- SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java b/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
deleted file mode 100644
index ea81750..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-public final class MaterializedViewUtils
-{
- private MaterializedViewUtils()
- {
- }
-
- /**
- * Calculate the natural endpoint for the view.
- *
- * The view natural endpoint is the endpint which has the same cardinality as this node in the replication factor.
- * The cardinality is the number at which this node would store a piece of data, given the change in replication
- * factor.
- *
- * For example, if we have the following ring:
- * A, T1 -> B, T2 -> C, T3 -> A
- *
- * For the token T1, at RF=1, A would be included, so A's cardinality for T1 is 1. For the token T1, at RF=2, B would
- * be included, so B's cardinality for token T1 is 2. For token T3, at RF = 2, A would be included, so A's cardinality
- * for T3 is 2.
- *
- * For a view whose base token is T1 and whose view token is T3, the pairings between the nodes would be:
- * A writes to C (A's cardinality is 1 for T1, and C's cardinality is 1 for T3)
- * B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 for T3)
- * C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 for T3)
- *
- * @throws RuntimeException if this method is called using a base token which does not belong to this replica
- */
- public static InetAddress getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
- {
- AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
-
- String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
- List<InetAddress> localBaseEndpoints = new ArrayList<>();
- List<InetAddress> localViewEndpoints = new ArrayList<>();
- for (InetAddress baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken))
- {
- if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter))
- localBaseEndpoints.add(baseEndpoint);
- }
-
- for (InetAddress viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken))
- {
- // If we are a base endpoint which is also a view replica, we use ourselves as our view replica
- if (viewEndpoint.equals(FBUtilities.getBroadcastAddress()))
- return viewEndpoint;
-
- // We have to remove any endpoint which is shared between the base and the view, as it will select itself
- // and throw off the counts otherwise.
- if (localBaseEndpoints.contains(viewEndpoint))
- localBaseEndpoints.remove(viewEndpoint);
- else if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter))
- localViewEndpoints.add(viewEndpoint);
- }
-
- // The replication strategy will be the same for the base and the view, as they must belong to the same keyspace.
- // Since the same replication strategy is used, the same placement should be used and we should get the same
- // number of replicas for all of the tokens in the ring.
- assert localBaseEndpoints.size() == localViewEndpoints.size() : "Replication strategy should have the same number of endpoints for the base and the view";
- int baseIdx = localBaseEndpoints.indexOf(FBUtilities.getBroadcastAddress());
-
- if (baseIdx < 0)
- {
-
- if (StorageService.instance.getTokenMetadata().pendingEndpointsFor(viewToken, keyspaceName).size() > 0)
- {
- //Since there are pending endpoints we are going to store hints this in the batchlog regardless.
- //So we can pretend we are the views endpoint.
-
- return FBUtilities.getBroadcastAddress();
- }
-
- throw new RuntimeException("Trying to get the view natural endpoint on a non-data replica");
- }
-
-
- return localViewEndpoints.get(baseIdx);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
new file mode 100644
index 0000000..7bcb592
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.view;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder;
+import org.apache.cassandra.db.CBuilder;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.partitions.AbstractBTreePartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.service.pager.QueryPager;
+
+/**
+ * A View copies data from a base table into a view table which can be queried independently from the
+ * base. Every update which targets the base table must be fed through the {@link ViewManager} to ensure
+ * that if a view needs to be updated, the updates are properly created and fed into the view.
+ *
+ * This class does the job of translating the base row to the view row.
+ *
+ * It handles reading existing state and figuring out what tombstones need to be generated.
+ *
+ * {@link View#createMutations(AbstractBTreePartition, TemporalRow.Set, boolean)} is the "main method"
+ *
+ */
+public class View
+{
+ /**
+ * The columns should all be updated together, so we use this object as group.
+ */
+ private static class Columns
+ {
+ //These are the base column definitions in terms of the *views* partitioning.
+ //Meaning we can see (for example) the partition key of the view contains a clustering key
+ //from the base table.
+ public final List<ColumnDefinition> partitionDefs;
+ public final List<ColumnDefinition> primaryKeyDefs;
+ public final List<ColumnDefinition> baseComplexColumns;
+
+ private Columns(List<ColumnDefinition> partitionDefs, List<ColumnDefinition> primaryKeyDefs, List<ColumnDefinition> baseComplexColumns)
+ {
+ this.partitionDefs = partitionDefs;
+ this.primaryKeyDefs = primaryKeyDefs;
+ this.baseComplexColumns = baseComplexColumns;
+ }
+ }
+
+ public final String name;
+ private volatile ViewDefinition definition;
+
+ private final ColumnFamilyStore baseCfs;
+
+ private Columns columns;
+
+ private final boolean viewHasAllPrimaryKeys;
+ private final boolean includeAllColumns;
+ private ViewBuilder builder;
+
+ public View(ViewDefinition definition,
+ ColumnFamilyStore baseCfs)
+ {
+ this.baseCfs = baseCfs;
+
+ name = definition.viewName;
+ includeAllColumns = definition.includeAllColumns;
+
+ viewHasAllPrimaryKeys = updateDefinition(definition);
+ }
+
+ public ViewDefinition getDefinition()
+ {
+ return definition;
+ }
+
+ /**
+ * Lookup column definitions in the base table that correspond to the view columns (should be 1:1)
+ *
+ * Notify caller if all primary keys in the view are ALL primary keys in the base. We do this to simplify
+ * tombstone checks.
+ *
+ * @param columns a list of columns to lookup in the base table
+ * @param definitions lists to populate for the base table definitions
+ * @return true if all view PKs are also Base PKs
+ */
+ private boolean resolveAndAddColumns(Iterable<ColumnIdentifier> columns, List<ColumnDefinition>... definitions)
+ {
+ boolean allArePrimaryKeys = true;
+ for (ColumnIdentifier identifier : columns)
+ {
+ ColumnDefinition cdef = baseCfs.metadata.getColumnDefinition(identifier);
+ assert cdef != null : "Could not resolve column " + identifier.toString();
+
+ for (List<ColumnDefinition> list : definitions)
+ {
+ list.add(cdef);
+ }
+
+ allArePrimaryKeys = allArePrimaryKeys && cdef.isPrimaryKeyColumn();
+ }
+
+ return allArePrimaryKeys;
+ }
+
+ /**
+ * This updates the columns stored which are dependent on the base CFMetaData.
+ *
+ * @return true if the view contains only columns which are part of the base's primary key; false if there is at
+ * least one column which is not.
+ */
+ public boolean updateDefinition(ViewDefinition definition)
+ {
+ this.definition = definition;
+
+ CFMetaData viewCfm = definition.metadata;
+ List<ColumnDefinition> partitionDefs = new ArrayList<>(viewCfm.partitionKeyColumns().size());
+ List<ColumnDefinition> primaryKeyDefs = new ArrayList<>(viewCfm.partitionKeyColumns().size()
+ + viewCfm.clusteringColumns().size());
+ List<ColumnDefinition> baseComplexColumns = new ArrayList<>();
+
+ // We only add the partition columns to the partitions list, but both partition columns and clustering
+ // columns are added to the primary keys list
+ boolean partitionAllPrimaryKeyColumns = resolveAndAddColumns(Iterables.transform(viewCfm.partitionKeyColumns(), cd -> cd.name), primaryKeyDefs, partitionDefs);
+ boolean clusteringAllPrimaryKeyColumns = resolveAndAddColumns(Iterables.transform(viewCfm.clusteringColumns(), cd -> cd.name), primaryKeyDefs);
+
+ for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
+ {
+ if (cdef.isComplex())
+ {
+ baseComplexColumns.add(cdef);
+ }
+ }
+
+ this.columns = new Columns(partitionDefs, primaryKeyDefs, baseComplexColumns);
+
+ return partitionAllPrimaryKeyColumns && clusteringAllPrimaryKeyColumns;
+ }
+
+ /**
+ * Check to see if the update could possibly modify a view. Cases where the view may be updated are:
+ * <ul>
+ * <li>View selects all columns</li>
+ * <li>Update contains any range tombstones</li>
+ * <li>Update touches one of the columns included in the view</li>
+ * </ul>
+ *
+ * If the update contains any range tombstones, there is a possibility that it will not touch a range that is
+ * currently included in the view.
+ *
+ * @return true if {@param partition} modifies a column included in the view
+ */
+ public boolean updateAffectsView(AbstractBTreePartition partition)
+ {
+ // If we are including all of the columns, then any update will be included
+ if (includeAllColumns)
+ return true;
+
+ // If there are range tombstones, tombstones will also need to be generated for the view
+ // This requires a query of the base rows and generating tombstones for all of those values
+ if (!partition.deletionInfo().isLive())
+ return true;
+
+ // Check each row for deletion or update
+ for (Row row : partition)
+ {
+ if (row.hasComplexDeletion())
+ return true;
+ if (!row.deletion().isLive())
+ return true;
+
+ for (ColumnData data : row)
+ {
+ if (definition.metadata.getColumnDefinition(data.column().name) != null)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Creates the clustering columns for the view based on the specified row and resolver policy
+ *
+ * @param temporalRow The current row
+ * @param resolver The policy to use when selecting versions of cells use
+ * @return The clustering object to use for the view
+ */
+ private Clustering viewClustering(TemporalRow temporalRow, TemporalRow.Resolver resolver)
+ {
+ CFMetaData viewCfm = definition.metadata;
+ int numViewClustering = viewCfm.clusteringColumns().size();
+ CBuilder clustering = CBuilder.create(viewCfm.comparator);
+ for (int i = 0; i < numViewClustering; i++)
+ {
+ ColumnDefinition definition = viewCfm.clusteringColumns().get(i);
+ clustering.add(temporalRow.clusteringValue(definition, resolver));
+ }
+
+ return clustering.build();
+ }
+
+ /**
+ * @return Mutation containing a range tombstone for a base partition key and TemporalRow.
+ */
+ private PartitionUpdate createTombstone(TemporalRow temporalRow,
+ DecoratedKey partitionKey,
+ Row.Deletion deletion,
+ TemporalRow.Resolver resolver,
+ int nowInSec)
+ {
+ CFMetaData viewCfm = definition.metadata;
+ Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+ builder.newRow(viewClustering(temporalRow, resolver));
+ builder.addRowDeletion(deletion);
+ return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
+ }
+
+ /**
+ * @return PartitionUpdate containing a complex tombstone for a TemporalRow, and the collection's column identifier.
+ */
+ private PartitionUpdate createComplexTombstone(TemporalRow temporalRow,
+ DecoratedKey partitionKey,
+ ColumnDefinition deletedColumn,
+ DeletionTime deletionTime,
+ TemporalRow.Resolver resolver,
+ int nowInSec)
+ {
+ CFMetaData viewCfm = definition.metadata;
+ Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
+ builder.newRow(viewClustering(temporalRow, resolver));
+ builder.addComplexDeletion(deletedColumn, deletionTime);
+ return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
+ }
+
+ /**
+ * @return View's DecoratedKey or null, if one of the view's primary key components has an invalid resolution from
+ * the TemporalRow and its Resolver
+ */
+ private DecoratedKey viewPartitionKey(TemporalRow temporalRow, TemporalRow.Resolver resolver)
+ {
+ List<ColumnDefinition> partitionDefs = this.columns.partitionDefs;
+ Object[] partitionKey = new Object[partitionDefs.size()];
+
+ for (int i = 0; i < partitionKey.length; i++)
+ {
+ ByteBuffer value = temporalRow.clusteringValue(partitionDefs.get(i), resolver);
+
+ if (value == null)
+ return null;
+
+ partitionKey[i] = value;
+ }
+
+ CFMetaData metadata = definition.metadata;
+ return metadata.decorateKey(CFMetaData.serializePartitionKey(metadata
+ .getKeyValidatorAsClusteringComparator()
+ .make(partitionKey)));
+ }
+
+ /**
+ * @return mutation which contains the tombstone for the referenced TemporalRow, or null if not necessary.
+ * TemporalRow's can reference at most one view row; there will be at most one row to be tombstoned, so only one
+ * mutation is necessary
+ */
+ private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow)
+ {
+ // Primary Key and Clustering columns do not generate tombstones
+ if (viewHasAllPrimaryKeys)
+ return null;
+
+ boolean hasUpdate = false;
+ List<ColumnDefinition> primaryKeyDefs = this.columns.primaryKeyDefs;
+ for (ColumnDefinition viewPartitionKeys : primaryKeyDefs)
+ {
+ if (!viewPartitionKeys.isPrimaryKeyColumn() && temporalRow.clusteringValue(viewPartitionKeys, TemporalRow.oldValueIfUpdated) != null)
+ hasUpdate = true;
+ }
+
+ if (!hasUpdate)
+ return null;
+
+ TemporalRow.Resolver resolver = TemporalRow.earliest;
+ return createTombstone(temporalRow,
+ viewPartitionKey(temporalRow, resolver),
+ Row.Deletion.shadowable(new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec)),
+ resolver,
+ temporalRow.nowInSec);
+ }
+
+ /**
+ * @return Mutation which is the transformed base table mutation for the view.
+ */
+ private PartitionUpdate createUpdatesForInserts(TemporalRow temporalRow)
+ {
+ TemporalRow.Resolver resolver = TemporalRow.latest;
+
+ DecoratedKey partitionKey = viewPartitionKey(temporalRow, resolver);
+ CFMetaData viewCfm = definition.metadata;
+
+ if (partitionKey == null)
+ {
+ // Not having a partition key means we aren't updating anything
+ return null;
+ }
+
+ Row.Builder regularBuilder = BTreeRow.unsortedBuilder(temporalRow.nowInSec);
+
+ CBuilder clustering = CBuilder.create(viewCfm.comparator);
+ for (int i = 0; i < viewCfm.clusteringColumns().size(); i++)
+ {
+ clustering.add(temporalRow.clusteringValue(viewCfm.clusteringColumns().get(i), resolver));
+ }
+ regularBuilder.newRow(clustering.build());
+ regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(viewCfm,
+ temporalRow.viewClusteringTimestamp(),
+ temporalRow.viewClusteringTtl(),
+ temporalRow.viewClusteringLocalDeletionTime()));
+
+ for (ColumnDefinition columnDefinition : viewCfm.allColumns())
+ {
+ if (columnDefinition.isPrimaryKeyColumn())
+ continue;
+
+ for (Cell cell : temporalRow.values(columnDefinition, resolver))
+ {
+ regularBuilder.addCell(cell);
+ }
+ }
+
+ return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, regularBuilder.build());
+ }
+
+ /**
+ * @param partition Update which possibly contains deletion info for which to generate view tombstones.
+ * @return View Tombstones which delete all of the rows which have been removed from the base table with
+ * {@param partition}
+ */
+ private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractBTreePartition partition)
+ {
+ final TemporalRow.Resolver resolver = TemporalRow.earliest;
+
+ DeletionInfo deletionInfo = partition.deletionInfo();
+
+ List<Mutation> mutations = new ArrayList<>();
+
+ // Check the complex columns to see if there are any which may have tombstones we need to create for the view
+ if (!columns.baseComplexColumns.isEmpty())
+ {
+ for (Row row : partition)
+ {
+ if (!row.hasComplexDeletion())
+ continue;
+
+ TemporalRow temporalRow = rowSet.getClustering(row.clustering());
+
+ assert temporalRow != null;
+
+ for (ColumnDefinition definition : columns.baseComplexColumns)
+ {
+ ComplexColumnData columnData = row.getComplexColumnData(definition);
+
+ if (columnData != null)
+ {
+ DeletionTime time = columnData.complexDeletion();
+ if (!time.isLive())
+ {
+ DecoratedKey targetKey = viewPartitionKey(temporalRow, resolver);
+ if (targetKey != null)
+ mutations.add(new Mutation(createComplexTombstone(temporalRow, targetKey, definition, time, resolver, temporalRow.nowInSec)));
+ }
+ }
+ }
+ }
+ }
+
+ ReadCommand command = null;
+
+ if (!deletionInfo.isLive())
+ {
+ // We have to generate tombstones for all of the affected rows, but we don't have the information in order
+ // to create them. This requires that we perform a read for the entire range that is being tombstoned, and
+ // generate a tombstone for each. This may be slow, because a single range tombstone can cover up to an
+ // entire partition of data which is not distributed on a single partition node.
+ DecoratedKey dk = rowSet.dk;
+
+ if (deletionInfo.hasRanges())
+ {
+ SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, dk);
+ Iterator<RangeTombstone> tombstones = deletionInfo.rangeIterator(false);
+ while (tombstones.hasNext())
+ {
+ RangeTombstone tombstone = tombstones.next();
+
+ builder.addSlice(tombstone.deletedSlice());
+ }
+
+ command = builder.build();
+ }
+ else
+ {
+ command = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, rowSet.nowInSec, dk);
+ }
+ }
+
+ if (command == null)
+ {
+ SinglePartitionSliceBuilder builder = null;
+ for (Row row : partition)
+ {
+ if (!row.deletion().isLive())
+ {
+ if (builder == null)
+ builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
+ builder.addSlice(Slice.make(row.clustering()));
+ }
+ }
+
+ if (builder != null)
+ command = builder.build();
+ }
+
+ if (command != null)
+ {
+
+ //We may have already done this work for
+ //another MV update so check
+
+ if (!rowSet.hasTombstonedExisting())
+ {
+ QueryPager pager = command.getPager(null);
+
+ // Add all of the rows which were recovered from the query to the row set
+ while (!pager.isExhausted())
+ {
+ try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+ PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
+ {
+ if (!iter.hasNext())
+ break;
+
+ try (RowIterator rowIterator = iter.next())
+ {
+ while (rowIterator.hasNext())
+ {
+ Row row = rowIterator.next();
+ rowSet.addRow(row, false);
+ }
+ }
+ }
+ }
+
+ //Incase we fetched nothing, avoid re checking on another MV update
+ rowSet.setTombstonedExisting();
+ }
+
+ // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone
+ // for the view.
+ for (TemporalRow temporalRow : rowSet)
+ {
+ DeletionTime deletionTime = temporalRow.deletionTime(partition);
+ if (!deletionTime.isLive())
+ {
+ DecoratedKey value = viewPartitionKey(temporalRow, resolver);
+ if (value != null)
+ {
+ PartitionUpdate update = createTombstone(temporalRow, value, Row.Deletion.regular(deletionTime), resolver, temporalRow.nowInSec);
+ if (update != null)
+ mutations.add(new Mutation(update));
+ }
+ }
+ }
+ }
+
+ return !mutations.isEmpty() ? mutations : null;
+ }
+
+ /**
+ * Read and update temporal rows in the set which have corresponding values stored on the local node
+ */
+ private void readLocalRows(TemporalRow.Set rowSet)
+ {
+ SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
+
+ for (TemporalRow temporalRow : rowSet)
+ builder.addSlice(temporalRow.baseSlice());
+
+ QueryPager pager = builder.build().getPager(null);
+
+ while (!pager.isExhausted())
+ {
+ try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+ PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
+ {
+ while (iter.hasNext())
+ {
+ try (RowIterator rows = iter.next())
+ {
+ while (rows.hasNext())
+ {
+ rowSet.addRow(rows.next(), false);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @return Set of rows which are contained in the partition update {@param partition}
+ */
+ private TemporalRow.Set separateRows(AbstractBTreePartition partition, Set<ColumnIdentifier> viewPrimaryKeyCols)
+ {
+
+ TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, viewPrimaryKeyCols, partition.partitionKey().getKey());
+
+ for (Row row : partition)
+ rowSet.addRow(row, true);
+
+ return rowSet;
+ }
+
+ /**
+ * Splits the partition update up and adds the existing state to each row.
+ * This data can be reused for multiple MV updates on the same base table
+ *
+ * @param partition the mutation
+ * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
+ * since all of the update will already be present in the base table.
+ * @return The set of temoral rows contained in this update
+ */
+ public TemporalRow.Set getTemporalRowSet(AbstractBTreePartition partition, TemporalRow.Set existing, boolean isBuilding)
+ {
+ if (!updateAffectsView(partition))
+ return null;
+
+ Set<ColumnIdentifier> columns = new HashSet<>(this.columns.primaryKeyDefs.size());
+ for (ColumnDefinition def : this.columns.primaryKeyDefs)
+ columns.add(def.name);
+
+ TemporalRow.Set rowSet;
+ if (existing == null)
+ {
+ rowSet = separateRows(partition, columns);
+
+ // If we are building the view, we do not want to add old values; they will always be the same
+ if (!isBuilding)
+ readLocalRows(rowSet);
+ }
+ else
+ {
+ rowSet = existing.withNewViewPrimaryKey(columns);
+ }
+
+ return rowSet;
+ }
+
+
+ /**
+ * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
+ * since all of the update will already be present in the base table.
+ * @return View mutations which represent the changes necessary as long as previously created mutations for the view
+ * have been applied successfully. This is based solely on the changes that are necessary given the current
+ * state of the base table and the newly applying partition data.
+ */
+ public Collection<Mutation> createMutations(AbstractBTreePartition partition, TemporalRow.Set rowSet, boolean isBuilding)
+ {
+ if (!updateAffectsView(partition))
+ return null;
+
+ Collection<Mutation> mutations = null;
+ for (TemporalRow temporalRow : rowSet)
+ {
+ // If we are building, there is no need to check for partition tombstones; those values will not be present
+ // in the partition data
+ if (!isBuilding)
+ {
+ PartitionUpdate partitionTombstone = createRangeTombstoneForRow(temporalRow);
+ if (partitionTombstone != null)
+ {
+ if (mutations == null) mutations = new LinkedList<>();
+ mutations.add(new Mutation(partitionTombstone));
+ }
+ }
+
+ PartitionUpdate insert = createUpdatesForInserts(temporalRow);
+ if (insert != null)
+ {
+ if (mutations == null) mutations = new LinkedList<>();
+ mutations.add(new Mutation(insert));
+ }
+ }
+
+ if (!isBuilding)
+ {
+ Collection<Mutation> deletion = createForDeletionInfo(rowSet, partition);
+ if (deletion != null && !deletion.isEmpty())
+ {
+ if (mutations == null) mutations = new LinkedList<>();
+ mutations.addAll(deletion);
+ }
+ }
+
+ return mutations;
+ }
+
+ public synchronized void build()
+ {
+ if (this.builder != null)
+ {
+ this.builder.stop();
+ this.builder = null;
+ }
+
+ this.builder = new ViewBuilder(baseCfs, this);
+ CompactionManager.instance.submitViewBuilder(builder);
+ }
+
+ @Nullable
+ public static CFMetaData findBaseTable(String keyspace, String viewName)
+ {
+ ViewDefinition view = Schema.instance.getView(keyspace, viewName);
+ return (view == null) ? null : Schema.instance.getCFMetaData(view.baseTableId);
+ }
+
+ public static Iterable<ViewDefinition> findAll(String keyspace, String baseTable)
+ {
+ KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
+ final UUID baseId = Schema.instance.getId(keyspace, baseTable);
+ return Iterables.filter(ksm.views, view -> view.baseTableId.equals(baseId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
new file mode 100644
index 0000000..62aa332
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+public class ViewBuilder extends CompactionInfo.Holder
+{
+ private final ColumnFamilyStore baseCfs;
+ private final View view;
+ private final UUID compactionId;
+ private volatile Token prevToken = null;
+
+ private static final Logger logger = LoggerFactory.getLogger(ViewBuilder.class);
+
+ private volatile boolean isStopped = false;
+
+ public ViewBuilder(ColumnFamilyStore baseCfs, View view)
+ {
+ this.baseCfs = baseCfs;
+ this.view = view;
+ compactionId = UUIDGen.getTimeUUID();
+ }
+
+ private void buildKey(DecoratedKey key)
+ {
+ QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null);
+
+ while (!pager.isExhausted())
+ {
+ try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+ PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup))
+ {
+ if (!partitionIterator.hasNext())
+ return;
+
+ try (RowIterator rowIterator = partitionIterator.next())
+ {
+ FilteredPartition partition = FilteredPartition.create(rowIterator);
+ TemporalRow.Set temporalRows = view.getTemporalRowSet(partition, null, true);
+
+ Collection<Mutation> mutations = view.createMutations(partition, temporalRows, true);
+
+ if (mutations != null)
+ StorageProxy.mutateMV(key.getKey(), mutations, true);
+ }
+ }
+ }
+ }
+
+ public void run()
+ {
+ String ksname = baseCfs.metadata.ksName, viewName = view.name;
+
+ if (SystemKeyspace.isViewBuilt(ksname, viewName))
+ return;
+
+ Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+ final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName);
+ Token lastToken;
+ Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
+ if (buildStatus == null)
+ {
+ baseCfs.forceBlockingFlush();
+ function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL);
+ int generation = Integer.MIN_VALUE;
+
+ try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
+ {
+ for (SSTableReader reader : temp)
+ {
+ generation = Math.max(reader.descriptor.generation, generation);
+ }
+ }
+
+ SystemKeyspace.beginViewBuild(ksname, viewName, generation);
+ lastToken = null;
+ }
+ else
+ {
+ function = new Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>>()
+ {
+ @Nullable
+ public Iterable<SSTableReader> apply(org.apache.cassandra.db.lifecycle.View view)
+ {
+ Iterable<SSTableReader> readers = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL).apply(view);
+ if (readers != null)
+ return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
+ return null;
+ }
+ };
+ lastToken = buildStatus.right;
+ }
+
+ prevToken = lastToken;
+ try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
+ ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
+ {
+ while (!isStopped && iter.hasNext())
+ {
+ DecoratedKey key = iter.next();
+ Token token = key.getToken();
+ if (lastToken == null || lastToken.compareTo(token) < 0)
+ {
+ for (Range<Token> range : ranges)
+ {
+ if (range.contains(token))
+ {
+ buildKey(key);
+
+ if (prevToken == null || prevToken.compareTo(token) != 0)
+ {
+ SystemKeyspace.updateViewBuildStatus(ksname, viewName, key.getToken());
+ prevToken = token;
+ }
+ }
+ }
+ lastToken = null;
+ }
+ }
+
+ if (!isStopped)
+ SystemKeyspace.finishViewBuildStatus(ksname, viewName);
+
+ }
+ catch (Exception e)
+ {
+ final ViewBuilder builder = new ViewBuilder(baseCfs, view);
+ ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(builder),
+ 5,
+ TimeUnit.MINUTES);
+ logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
+ }
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ long rangesLeft = 0, rangesTotal = 0;
+ Token lastToken = prevToken;
+
+ // This approximation is not very accurate, but since we do not have a method which allows us to calculate the
+ // percentage of a range covered by a second range, this is the best approximation that we can calculate.
+ // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of
+ // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node
+ // has.
+ for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName()))
+ {
+ rangesLeft++;
+ rangesTotal++;
+ // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the
+ // end of the method.
+ if (lastToken == null || range.contains(lastToken))
+ rangesLeft = 0;
+ }
+ return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
+ }
+
+ public void stop()
+ {
+ isStopped = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java
new file mode 100644
index 0000000..2364ed1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.view;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.Lock;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Striped;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Manages {@link View}'s for a single {@link ColumnFamilyStore}. All of the views for that table are created when this
+ * manager is initialized.
+ *
+ * The main purposes of the manager are to provide a single location for updates to be vetted to see whether they update
+ * any views {@link ViewManager#updatesAffectView(Collection, boolean)}, provide locks to prevent multiple
+ * updates from creating incoherent updates in the view {@link ViewManager#acquireLockFor(ByteBuffer)}, and
+ * to affect change on the view.
+ */
+public class ViewManager
+{
+ public class ForStore
+ {
+ private final ConcurrentNavigableMap<String, View> viewsByName;
+
+ public ForStore()
+ {
+ this.viewsByName = new ConcurrentSkipListMap<>();
+ }
+
+ public Iterable<View> allViews()
+ {
+ return viewsByName.values();
+ }
+
+ public Iterable<ColumnFamilyStore> allViewsCfs()
+ {
+ List<ColumnFamilyStore> viewColumnFamilies = new ArrayList<>();
+ for (View view : allViews())
+ viewColumnFamilies.add(keyspace.getColumnFamilyStore(view.getDefinition().viewName));
+ return viewColumnFamilies;
+ }
+
+ public void forceBlockingFlush()
+ {
+ for (ColumnFamilyStore viewCfs : allViewsCfs())
+ viewCfs.forceBlockingFlush();
+ }
+
+ public void dumpMemtables()
+ {
+ for (ColumnFamilyStore viewCfs : allViewsCfs())
+ viewCfs.dumpMemtable();
+ }
+
+ public void truncateBlocking(long truncatedAt)
+ {
+ for (ColumnFamilyStore viewCfs : allViewsCfs())
+ {
+ ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
+ SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter);
+ }
+ }
+
+ public void addView(View view)
+ {
+ viewsByName.put(view.name, view);
+ }
+
+ public void removeView(String name)
+ {
+ viewsByName.remove(name);
+ }
+ }
+
+ private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentViewWriters() * 1024);
+
+ private static final boolean enableCoordinatorBatchlog = Boolean.getBoolean("cassandra.mv_enable_coordinator_batchlog");
+
+ private final ConcurrentNavigableMap<UUID, ForStore> viewManagersByStore;
+ private final ConcurrentNavigableMap<String, View> viewsByName;
+ private final Keyspace keyspace;
+
+ public ViewManager(Keyspace keyspace)
+ {
+ this.viewManagersByStore = new ConcurrentSkipListMap<>();
+ this.viewsByName = new ConcurrentSkipListMap<>();
+ this.keyspace = keyspace;
+ }
+
+ /**
+ * Calculates and pushes updates to the views replicas. The replicas are determined by
+ * {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
+ */
+ public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog)
+ {
+ List<Mutation> mutations = null;
+ TemporalRow.Set temporalRows = null;
+ for (Map.Entry<String, View> view : viewsByName.entrySet())
+ {
+ temporalRows = view.getValue().getTemporalRowSet(update, temporalRows, false);
+
+ Collection<Mutation> viewMutations = view.getValue().createMutations(update, temporalRows, false);
+ if (viewMutations != null && !viewMutations.isEmpty())
+ {
+ if (mutations == null)
+ mutations = Lists.newLinkedList();
+ mutations.addAll(viewMutations);
+ }
+ }
+
+ if (mutations != null)
+ StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog);
+ }
+
+ public boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean coordinatorBatchlog)
+ {
+ if (coordinatorBatchlog && !enableCoordinatorBatchlog)
+ return false;
+
+ for (IMutation mutation : mutations)
+ {
+ for (PartitionUpdate cf : mutation.getPartitionUpdates())
+ {
+ assert keyspace.getName().equals(cf.metadata().ksName);
+
+ if (coordinatorBatchlog && keyspace.getReplicationStrategy().getReplicationFactor() == 1)
+ continue;
+
+ for (View view : allViews())
+ {
+ if (!cf.metadata().cfId.equals(view.getDefinition().baseTableId))
+ continue;
+
+ if (view.updateAffectsView(cf))
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public Iterable<View> allViews()
+ {
+ return viewsByName.values();
+ }
+
+ public void update(String viewName)
+ {
+ View view = viewsByName.get(viewName);
+ assert view != null : "When updating a view, it should already be in the ViewManager";
+ view.build();
+
+ // We provide the new definition from the base metadata
+ Optional<ViewDefinition> viewDefinition = keyspace.getMetadata().views.get(viewName);
+ assert viewDefinition.isPresent() : "When updating a view, it should still be in the Keyspaces views";
+ view.updateDefinition(viewDefinition.get());
+ }
+
+ public void reload()
+ {
+ Map<String, ViewDefinition> newViewsByName = new HashMap<>();
+ for (ViewDefinition definition : keyspace.getMetadata().views)
+ {
+ newViewsByName.put(definition.viewName, definition);
+ }
+
+ for (String viewName : viewsByName.keySet())
+ {
+ if (!newViewsByName.containsKey(viewName))
+ removeView(viewName);
+ }
+
+ for (Map.Entry<String, ViewDefinition> entry : newViewsByName.entrySet())
+ {
+ if (!viewsByName.containsKey(entry.getKey()))
+ addView(entry.getValue());
+ }
+
+ for (View view : allViews())
+ {
+ view.build();
+ // We provide the new definition from the base metadata
+ view.updateDefinition(newViewsByName.get(view.name));
+ }
+ }
+
+ public void addView(ViewDefinition definition)
+ {
+ View view = new View(definition, keyspace.getColumnFamilyStore(definition.baseTableId));
+ forTable(view.getDefinition().baseTableId).addView(view);
+ viewsByName.put(definition.viewName, view);
+ }
+
+ public void removeView(String name)
+ {
+ View view = viewsByName.remove(name);
+
+ if (view == null)
+ return;
+
+ forTable(view.getDefinition().baseTableId).removeView(name);
+ SystemKeyspace.setViewRemoved(keyspace.getName(), view.name);
+ }
+
+ public void buildAllViews()
+ {
+ for (View view : allViews())
+ view.build();
+ }
+
+ public ForStore forTable(UUID baseId)
+ {
+ ForStore forStore = viewManagersByStore.get(baseId);
+ if (forStore == null)
+ {
+ forStore = new ForStore();
+ ForStore previous = viewManagersByStore.put(baseId, forStore);
+ if (previous != null)
+ forStore = previous;
+ }
+ return forStore;
+ }
+
+ public static Lock acquireLockFor(ByteBuffer key)
+ {
+ Lock lock = LOCKS.get(key);
+
+ if (lock.tryLock())
+ return lock;
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/ViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java
new file mode 100644
index 0000000..628142d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public final class ViewUtils
+{
+ private ViewUtils()
+ {
+ }
+
+ /**
+ * Calculate the natural endpoint for the view.
+ *
+ * The view natural endpoint is the endpint which has the same cardinality as this node in the replication factor.
+ * The cardinality is the number at which this node would store a piece of data, given the change in replication
+ * factor.
+ *
+ * For example, if we have the following ring:
+ * A, T1 -> B, T2 -> C, T3 -> A
+ *
+ * For the token T1, at RF=1, A would be included, so A's cardinality for T1 is 1. For the token T1, at RF=2, B would
+ * be included, so B's cardinality for token T1 is 2. For token T3, at RF = 2, A would be included, so A's cardinality
+ * for T3 is 2.
+ *
+ * For a view whose base token is T1 and whose view token is T3, the pairings between the nodes would be:
+ * A writes to C (A's cardinality is 1 for T1, and C's cardinality is 1 for T3)
+ * B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 for T3)
+ * C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 for T3)
+ *
+ * @throws RuntimeException if this method is called using a base token which does not belong to this replica
+ */
+ public static InetAddress getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
+ {
+ AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
+
+ String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+ List<InetAddress> localBaseEndpoints = new ArrayList<>();
+ List<InetAddress> localViewEndpoints = new ArrayList<>();
+ for (InetAddress baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken))
+ {
+ if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter))
+ localBaseEndpoints.add(baseEndpoint);
+ }
+
+ for (InetAddress viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken))
+ {
+ // If we are a base endpoint which is also a view replica, we use ourselves as our view replica
+ if (viewEndpoint.equals(FBUtilities.getBroadcastAddress()))
+ return viewEndpoint;
+
+ // We have to remove any endpoint which is shared between the base and the view, as it will select itself
+ // and throw off the counts otherwise.
+ if (localBaseEndpoints.contains(viewEndpoint))
+ localBaseEndpoints.remove(viewEndpoint);
+ else if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter))
+ localViewEndpoints.add(viewEndpoint);
+ }
+
+ // The replication strategy will be the same for the base and the view, as they must belong to the same keyspace.
+ // Since the same replication strategy is used, the same placement should be used and we should get the same
+ // number of replicas for all of the tokens in the ring.
+ assert localBaseEndpoints.size() == localViewEndpoints.size() : "Replication strategy should have the same number of endpoints for the base and the view";
+ int baseIdx = localBaseEndpoints.indexOf(FBUtilities.getBroadcastAddress());
+
+ if (baseIdx < 0)
+ {
+
+ if (StorageService.instance.getTokenMetadata().pendingEndpointsFor(viewToken, keyspaceName).size() > 0)
+ {
+ //Since there are pending endpoints we are going to store hints this in the batchlog regardless.
+ //So we can pretend we are the views endpoint.
+
+ return FBUtilities.getBroadcastAddress();
+ }
+
+ throw new RuntimeException("Trying to get the view natural endpoint on a non-data replica");
+ }
+
+
+ return localViewEndpoints.get(baseIdx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index 46872c1..bbf6fd6 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -276,7 +276,7 @@ public class ByteOrderedPartitioner implements IPartitioner
for (String ks : Schema.instance.getKeyspaces())
{
- for (CFMetaData cfmd : Schema.instance.getTables(ks))
+ for (CFMetaData cfmd : Schema.instance.getTablesAndViews(ks))
{
for (Range<Token> r : sortedRanges)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index 464ac3d..96b4ca0 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -216,7 +216,7 @@ public class OrderPreservingPartitioner implements IPartitioner
for (String ks : Schema.instance.getKeyspaces())
{
- for (CFMetaData cfmd : Schema.instance.getTables(ks))
+ for (CFMetaData cfmd : Schema.instance.getTablesAndViews(ks))
{
for (Range<Token> r : sortedRanges)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java b/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
deleted file mode 100644
index 39a5574..0000000
--- a/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.metrics;
-
-import com.codahale.metrics.Counter;
-
-import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
-
-public class MVWriteMetrics extends ClientRequestMetrics
-{
- public final Counter viewReplicasAttempted;
- public final Counter viewReplicasSuccess;
-
- public MVWriteMetrics(String scope) {
- super(scope);
- viewReplicasAttempted = Metrics.counter(factory.createMetricName("ViewReplicasAttempted"));
- viewReplicasSuccess = Metrics.counter(factory.createMetricName("ViewReplicasSuccess"));
- }
-
- public void release()
- {
- super.release();
- Metrics.remove(factory.createMetricName("ViewReplicasAttempted"));
- Metrics.remove(factory.createMetricName("ViewReplicasSuccess"));
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java b/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
new file mode 100644
index 0000000..c99cc5c
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Counter;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class ViewWriteMetrics extends ClientRequestMetrics
+{
+ public final Counter viewReplicasAttempted;
+ public final Counter viewReplicasSuccess;
+
+ public ViewWriteMetrics(String scope) {
+ super(scope);
+ viewReplicasAttempted = Metrics.counter(factory.createMetricName("ViewReplicasAttempted"));
+ viewReplicasSuccess = Metrics.counter(factory.createMetricName("ViewReplicasSuccess"));
+ }
+
+ public void release()
+ {
+ super.release();
+ Metrics.remove(factory.createMetricName("ViewReplicasAttempted"));
+ Metrics.remove(factory.createMetricName("ViewReplicasSuccess"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index 372ff6e..0a0d9d8 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@ -22,9 +22,12 @@ import java.util.Optional;
import java.util.Set;
import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
+import org.apache.avro.reflect.Nullable;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.ViewDefinition;
import org.apache.cassandra.exceptions.ConfigurationException;
/**
@@ -35,51 +38,72 @@ public final class KeyspaceMetadata
public final String name;
public final KeyspaceParams params;
public final Tables tables;
+ public final Views views;
public final Types types;
public final Functions functions;
- private KeyspaceMetadata(String name, KeyspaceParams params, Tables tables, Types types, Functions functions)
+ private KeyspaceMetadata(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
{
this.name = name;
this.params = params;
this.tables = tables;
+ this.views = views;
this.types = types;
this.functions = functions;
}
public static KeyspaceMetadata create(String name, KeyspaceParams params)
{
- return new KeyspaceMetadata(name, params, Tables.none(), Types.none(), Functions.none());
+ return new KeyspaceMetadata(name, params, Tables.none(), Views.none(), Types.none(), Functions.none());
}
public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables)
{
- return new KeyspaceMetadata(name, params, tables, Types.none(), Functions.none());
+ return new KeyspaceMetadata(name, params, tables, Views.none(), Types.none(), Functions.none());
}
- public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables, Types types, Functions functions)
+ public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
{
- return new KeyspaceMetadata(name, params, tables, types, functions);
+ return new KeyspaceMetadata(name, params, tables, views, types, functions);
}
public KeyspaceMetadata withSwapped(KeyspaceParams params)
{
- return new KeyspaceMetadata(name, params, tables, types, functions);
+ return new KeyspaceMetadata(name, params, tables, views, types, functions);
}
- public KeyspaceMetadata withSwapped(Tables tables)
+ public KeyspaceMetadata withSwapped(Tables regular)
{
- return new KeyspaceMetadata(name, params, tables, types, functions);
+ return new KeyspaceMetadata(name, params, regular, views, types, functions);
+ }
+
+ public KeyspaceMetadata withSwapped(Views views)
+ {
+ return new KeyspaceMetadata(name, params, tables, views, types, functions);
}
public KeyspaceMetadata withSwapped(Types types)
{
- return new KeyspaceMetadata(name, params, tables, types, functions);
+ return new KeyspaceMetadata(name, params, tables, views, types, functions);
}
public KeyspaceMetadata withSwapped(Functions functions)
{
- return new KeyspaceMetadata(name, params, tables, types, functions);
+ return new KeyspaceMetadata(name, params, tables, views, types, functions);
+ }
+
+ public Iterable<CFMetaData> tablesAndViews()
+ {
+ return Iterables.concat(tables, views.metadatas());
+ }
+
+ @Nullable
+ public CFMetaData getTableOrViewNullable(String tableOrViewName)
+ {
+ ViewDefinition view = views.getNullable(tableOrViewName);
+ return view == null
+ ? tables.getNullable(tableOrViewName)
+ : view.metadata;
}
public Set<String> existingIndexNames(String cfToExclude)
@@ -94,7 +118,7 @@ public final class KeyspaceMetadata
public Optional<CFMetaData> findIndexedTable(String indexName)
{
- for (CFMetaData cfm : tables)
+ for (CFMetaData cfm : tablesAndViews())
if (cfm.getIndexes().has(indexName))
return Optional.of(cfm);
@@ -104,7 +128,7 @@ public final class KeyspaceMetadata
@Override
public int hashCode()
{
- return Objects.hashCode(name, params, tables, functions, types);
+ return Objects.hashCode(name, params, tables, views, functions, types);
}
@Override
@@ -121,6 +145,7 @@ public final class KeyspaceMetadata
return name.equals(other.name)
&& params.equals(other.params)
&& tables.equals(other.tables)
+ && views.equals(other.views)
&& functions.equals(other.functions)
&& types.equals(other.types);
}
@@ -132,6 +157,7 @@ public final class KeyspaceMetadata
.add("name", name)
.add("params", params)
.add("tables", tables)
+ .add("views", views)
.add("functions", functions)
.add("types", types)
.toString();
@@ -144,6 +170,6 @@ public final class KeyspaceMetadata
+ "or contain non-alphanumeric-underscore characters (got \"%s\")",
Schema.NAME_LENGTH,
name));
- tables.forEach(CFMetaData::validate);
+ tablesAndViews().forEach(CFMetaData::validate);
}
}