You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/07/30 02:55:22 UTC
[4/7] cassandra git commit: Revert "Revert "Materialized Views""
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
new file mode 100644
index 0000000..2ddc6ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public abstract class AbstractReadCommandBuilder
+{
+ protected final ColumnFamilyStore cfs;
+ protected int nowInSeconds;
+
+ private int cqlLimit = -1;
+ private int pagingLimit = -1;
+ protected boolean reversed = false;
+
+ protected Set<ColumnIdentifier> columns;
+ protected final RowFilter filter = RowFilter.create();
+
+ private Slice.Bound lowerClusteringBound;
+ private Slice.Bound upperClusteringBound;
+
+ private NavigableSet<Clustering> clusterings;
+
+ // Use Util.cmd() instead of this ctor directly
+ AbstractReadCommandBuilder(ColumnFamilyStore cfs)
+ {
+ this.cfs = cfs;
+ this.nowInSeconds = FBUtilities.nowInSeconds();
+ }
+
+ public AbstractReadCommandBuilder withNowInSeconds(int nowInSec)
+ {
+ this.nowInSeconds = nowInSec;
+ return this;
+ }
+
+ public AbstractReadCommandBuilder fromIncl(Object... values)
+ {
+ assert lowerClusteringBound == null && clusterings == null;
+ this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, true, values);
+ return this;
+ }
+
+ public AbstractReadCommandBuilder fromExcl(Object... values)
+ {
+ assert lowerClusteringBound == null && clusterings == null;
+ this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, false, values);
+ return this;
+ }
+
+ public AbstractReadCommandBuilder toIncl(Object... values)
+ {
+ assert upperClusteringBound == null && clusterings == null;
+ this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, true, values);
+ return this;
+ }
+
+ public AbstractReadCommandBuilder toExcl(Object... values)
+ {
+ assert upperClusteringBound == null && clusterings == null;
+ this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, false, values);
+ return this;
+ }
+
+ public AbstractReadCommandBuilder includeRow(Object... values)
+ {
+ assert lowerClusteringBound == null && upperClusteringBound == null;
+
+ if (this.clusterings == null)
+ this.clusterings = new TreeSet<>(cfs.metadata.comparator);
+
+ this.clusterings.add(cfs.metadata.comparator.make(values));
+ return this;
+ }
+
+ public AbstractReadCommandBuilder reverse()
+ {
+ this.reversed = true;
+ return this;
+ }
+
+ public AbstractReadCommandBuilder withLimit(int newLimit)
+ {
+ this.cqlLimit = newLimit;
+ return this;
+ }
+
+ public AbstractReadCommandBuilder withPagingLimit(int newLimit)
+ {
+ this.pagingLimit = newLimit;
+ return this;
+ }
+
+ public AbstractReadCommandBuilder columns(String... columns)
+ {
+ if (this.columns == null)
+ this.columns = new HashSet<>();
+
+ for (String column : columns)
+ this.columns.add(ColumnIdentifier.getInterned(column, true));
+ return this;
+ }
+
+ private ByteBuffer bb(Object value, AbstractType<?> type)
+ {
+ return value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)type).decompose(value);
+ }
+
+ private AbstractType<?> forValues(AbstractType<?> collectionType)
+ {
+ assert collectionType instanceof CollectionType;
+ CollectionType ct = (CollectionType)collectionType;
+ switch (ct.kind)
+ {
+ case LIST:
+ case MAP:
+ return ct.valueComparator();
+ case SET:
+ return ct.nameComparator();
+ }
+ throw new AssertionError();
+ }
+
+ private AbstractType<?> forKeys(AbstractType<?> collectionType)
+ {
+ assert collectionType instanceof CollectionType;
+ CollectionType ct = (CollectionType)collectionType;
+ switch (ct.kind)
+ {
+ case LIST:
+ case MAP:
+ return ct.nameComparator();
+ }
+ throw new AssertionError();
+ }
+
+ public AbstractReadCommandBuilder filterOn(String column, Operator op, Object value)
+ {
+ ColumnDefinition def = cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned(column, true));
+ assert def != null;
+
+ AbstractType<?> type = def.type;
+ if (op == Operator.CONTAINS)
+ type = forValues(type);
+ else if (op == Operator.CONTAINS_KEY)
+ type = forKeys(type);
+
+ this.filter.add(def, op, bb(value, type));
+ return this;
+ }
+
+ protected ColumnFilter makeColumnFilter()
+ {
+ if (columns == null || columns.isEmpty())
+ return ColumnFilter.all(cfs.metadata);
+
+ ColumnFilter.Builder filter = ColumnFilter.selectionBuilder();
+ for (ColumnIdentifier column : columns)
+ filter.add(cfs.metadata.getColumnDefinition(column));
+ return filter.build();
+ }
+
+ protected ClusteringIndexFilter makeFilter()
+ {
+ if (clusterings != null)
+ {
+ return new ClusteringIndexNamesFilter(clusterings, reversed);
+ }
+ else
+ {
+ Slice slice = Slice.make(lowerClusteringBound == null ? Slice.Bound.BOTTOM : lowerClusteringBound,
+ upperClusteringBound == null ? Slice.Bound.TOP : upperClusteringBound);
+ return new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), reversed);
+ }
+ }
+
+ protected DataLimits makeLimits()
+ {
+ DataLimits limits = cqlLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(cqlLimit);
+ if (pagingLimit >= 0)
+ limits = limits.forPaging(pagingLimit);
+ return limits;
+ }
+
+ public abstract ReadCommand build();
+
+ public static class SinglePartitionBuilder extends AbstractReadCommandBuilder
+ {
+ private final DecoratedKey partitionKey;
+
+ public SinglePartitionBuilder(ColumnFamilyStore cfs, DecoratedKey key)
+ {
+ super(cfs);
+ this.partitionKey = key;
+ }
+
+ @Override
+ public ReadCommand build()
+ {
+ return SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter());
+ }
+ }
+
+ public static class SinglePartitionSliceBuilder extends AbstractReadCommandBuilder
+ {
+ private final DecoratedKey partitionKey;
+ private Slices.Builder sliceBuilder;
+
+ public SinglePartitionSliceBuilder(ColumnFamilyStore cfs, DecoratedKey key)
+ {
+ super(cfs);
+ this.partitionKey = key;
+ sliceBuilder = new Slices.Builder(cfs.getComparator());
+ }
+
+ public SinglePartitionSliceBuilder addSlice(Slice slice)
+ {
+ sliceBuilder.add(slice);
+ return this;
+ }
+
+ @Override
+ protected ClusteringIndexFilter makeFilter()
+ {
+ return new ClusteringIndexSliceFilter(sliceBuilder.build(), reversed);
+ }
+
+ @Override
+ public ReadCommand build()
+ {
+ return SinglePartitionSliceCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter());
+ }
+ }
+
+ public static class PartitionRangeBuilder extends AbstractReadCommandBuilder
+ {
+ private DecoratedKey startKey;
+ private boolean startInclusive;
+ private DecoratedKey endKey;
+ private boolean endInclusive;
+
+ public PartitionRangeBuilder(ColumnFamilyStore cfs)
+ {
+ super(cfs);
+ }
+
+ public PartitionRangeBuilder fromKeyIncl(Object... values)
+ {
+ assert startKey == null;
+ this.startInclusive = true;
+ this.startKey = makeKey(cfs.metadata, values);
+ return this;
+ }
+
+ public PartitionRangeBuilder fromKeyExcl(Object... values)
+ {
+ assert startKey == null;
+ this.startInclusive = false;
+ this.startKey = makeKey(cfs.metadata, values);
+ return this;
+ }
+
+ public PartitionRangeBuilder toKeyIncl(Object... values)
+ {
+ assert endKey == null;
+ this.endInclusive = true;
+ this.endKey = makeKey(cfs.metadata, values);
+ return this;
+ }
+
+ public PartitionRangeBuilder toKeyExcl(Object... values)
+ {
+ assert endKey == null;
+ this.endInclusive = false;
+ this.endKey = makeKey(cfs.metadata, values);
+ return this;
+ }
+
+ @Override
+ public ReadCommand build()
+ {
+ PartitionPosition start = startKey;
+ if (start == null)
+ {
+ start = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
+ startInclusive = false;
+ }
+ PartitionPosition end = endKey;
+ if (end == null)
+ {
+ end = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
+ endInclusive = true;
+ }
+
+ AbstractBounds<PartitionPosition> bounds;
+ if (startInclusive && endInclusive)
+ bounds = new Bounds<>(start, end);
+ else if (startInclusive && !endInclusive)
+ bounds = new IncludingExcludingBounds<>(start, end);
+ else if (!startInclusive && endInclusive)
+ bounds = new Range<>(start, end);
+ else
+ bounds = new ExcludingBounds<>(start, end);
+
+ return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
+ }
+
+ static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)
+ {
+ if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey)
+ return (DecoratedKey)partitionKey[0];
+
+ ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
+ return StorageService.getPartitioner().decorateKey(key);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 6ac132c..24da365 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.view.MaterializedViewManager;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
@@ -159,6 +160,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private final AtomicInteger fileIndexGenerator = new AtomicInteger(0);
public final SecondaryIndexManager indexManager;
+ public final MaterializedViewManager materializedViewManager;
/* These are locally held copies to be changed from the config during runtime */
private volatile DefaultInteger minCompactionThreshold;
@@ -195,6 +197,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
indexManager.reload();
+ materializedViewManager.reload();
// If the CF comparator has changed, we need to change the memtable,
// because the old one still aliases the previous comparator.
if (data.getView().getCurrentMemtable().initialComparator != metadata.comparator)
@@ -331,6 +334,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
this.partitioner = partitioner;
this.directories = directories;
this.indexManager = new SecondaryIndexManager(this);
+ this.materializedViewManager = new MaterializedViewManager(this);
this.metric = new TableMetrics(this);
fileIndexGenerator.set(generation);
sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
@@ -451,6 +455,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
SystemKeyspace.removeTruncationRecord(metadata.cfId);
data.dropSSTables();
indexManager.invalidate();
+ materializedViewManager.invalidate();
invalidateCaches();
}
@@ -572,8 +577,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// must be called after all sstables are loaded since row cache merges all row versions
- public void initRowCache()
+ public void init()
{
+ materializedViewManager.init();
+
if (!isRowCacheEnabled())
return;
@@ -1806,7 +1813,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
cfs.data.reset();
return null;
}
- }, true);
+ }, true, false);
}
}
@@ -1834,19 +1841,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// flush the CF being truncated before forcing the new segment
forceBlockingFlush();
+ materializedViewManager.forceBlockingFlush();
+
// sleep a little to make sure that our truncatedAt comes after any sstable
// that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
}
else
{
- // just nuke the memtable data w/o writing to disk first
- synchronized (data)
- {
- final Flush flush = new Flush(true);
- flushExecutor.execute(flush);
- postFlushExecutor.submit(flush.postFlush);
- }
+ dumpMemtable();
+ materializedViewManager.dumpMemtables();
}
Runnable truncateRunnable = new Runnable()
@@ -1866,17 +1870,32 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (SecondaryIndex index : indexManager.getIndexes())
index.truncateBlocking(truncatedAt);
+ materializedViewManager.truncateBlocking(truncatedAt);
+
SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
logger.debug("cleaning out row cache");
invalidateCaches();
}
};
- runWithCompactionsDisabled(Executors.callable(truncateRunnable), true);
+ runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true);
logger.debug("truncate complete");
}
- public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
+ /**
+ * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable.
+ */
+ public void dumpMemtable()
+ {
+ synchronized (data)
+ {
+ final Flush flush = new Flush(true);
+ flushExecutor.execute(flush);
+ postFlushExecutor.submit(flush.postFlush);
+ }
+ }
+
+ public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews)
{
// synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
// and so we only run one major compaction at a time
@@ -1884,17 +1903,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
logger.debug("Cancelling in-progress compactions for {}", metadata.cfName);
- Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes();
- for (ColumnFamilyStore cfs : selfWithIndexes)
+ Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews
+ ? Iterables.concat(concatWithIndexes(), materializedViewManager.allViewsCfs())
+ : concatWithIndexes();
+
+ for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
cfs.getCompactionStrategyManager().pause();
try
{
// interrupt in-progress compactions
- CompactionManager.instance.interruptCompactionForCFs(selfWithIndexes, interruptValidation);
- CompactionManager.instance.waitForCessation(selfWithIndexes);
+ CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, interruptValidation);
+ CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs);
// doublecheck that we finished, instead of timing out
- for (ColumnFamilyStore cfs : selfWithIndexes)
+ for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
{
if (!cfs.getTracker().getCompacting().isEmpty())
{
@@ -1916,7 +1938,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
finally
{
- for (ColumnFamilyStore cfs : selfWithIndexes)
+ for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
cfs.getCompactionStrategyManager().resume();
}
}
@@ -1936,7 +1958,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
};
- return runWithCompactionsDisabled(callable, false);
+ return runWithCompactionsDisabled(callable, false, false);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index f37ce66..78b593b 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -23,13 +23,15 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.commitlog.CommitLog;
@@ -37,14 +39,17 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.view.MaterializedViewManager;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.schema.SchemaKeyspace;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.metrics.KeyspaceMetrics;
@@ -70,7 +75,10 @@ public class Keyspace
}
private volatile KeyspaceMetadata metadata;
- public final OpOrder writeOrder = new OpOrder();
+
+ //OpOrder is defined globally since we need to order writes across
+ //Keyspaces in the case of MaterializedViews (batchlog of MV mutations)
+ public static final OpOrder writeOrder = new OpOrder();
/* ColumnFamilyStore per column family */
private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<>();
@@ -122,7 +130,7 @@ public class Keyspace
// keyspace has to be constructed and in the cache before cacheRow can be called
for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores())
- cfs.initRowCache();
+ cfs.init();
}
}
}
@@ -352,10 +360,14 @@ public class Keyspace
// CFS being created for the first time, either on server startup or new CF being added.
// We don't worry about races here; startup is safe, and adding multiple idential CFs
// simultaneously is a "don't do that" scenario.
- ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
+ ColumnFamilyStore newCfs = ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables);
+
+ ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, newCfs);
// CFS mbean instantiation will error out before we hit this, but in case that changes...
if (oldCfs != null)
throw new IllegalStateException("added multiple mappings for cf id " + cfId);
+
+ newCfs.init();
}
else
{
@@ -380,11 +392,41 @@ public class Keyspace
* @param writeCommitLog false to disable commitlog append entirely
* @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
*/
- public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
+ public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes)
{
if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
throw new RuntimeException("Testing write failures");
+ Lock lock = null;
+ boolean requiresViewUpdate = updateIndexes && MaterializedViewManager.updatesAffectView(Collections.singleton(mutation), false);
+
+ if (requiresViewUpdate)
+ {
+ lock = MaterializedViewManager.acquireLockFor(mutation.key().getKey());
+
+ if (lock == null)
+ {
+ if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
+ {
+ logger.debug("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
+ Tracing.trace("Could not acquire MV lock");
+ throw new WriteTimeoutException(WriteType.MATERIALIZED_VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
+ }
+ else
+ {
+ //This MV update can't happen right now. so rather than keep this thread busy
+ // we will re-apply ourself to the queue and try again later
+ StageManager.getStage(Stage.MUTATION).execute(() -> {
+ if (writeCommitLog)
+ mutation.apply();
+ else
+ mutation.applyUnsafe();
+ });
+
+ return;
+ }
+ }
+ }
int nowInSec = FBUtilities.nowInSeconds();
try (OpOrder.Group opGroup = writeOrder.start())
{
@@ -401,10 +443,26 @@ public class Keyspace
ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().cfId);
if (cfs == null)
{
- logger.error("Attempting to mutate non-existant table {}", upd.metadata().cfId);
+ logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().cfId, upd.metadata().ksName, upd.metadata().cfName);
continue;
}
+ if (requiresViewUpdate)
+ {
+ try
+ {
+ Tracing.trace("Create materialized view mutations from replica");
+ cfs.materializedViewManager.pushViewReplicaUpdates(upd.partitionKey().getKey(), upd);
+ }
+ catch (Exception e)
+ {
+ if (!(e instanceof WriteTimeoutException))
+ logger.warn("Encountered exception when creating materialized view mutations", e);
+
+ JVMStabilityInspector.inspectThrowable(e);
+ }
+ }
+
Tracing.trace("Adding to {} memtable", upd.metadata().cfName);
SecondaryIndexManager.Updater updater = updateIndexes
? cfs.indexManager.updaterFor(upd, opGroup, nowInSec)
@@ -412,6 +470,11 @@ public class Keyspace
cfs.apply(upd, updater, opGroup, replayPosition);
}
}
+ finally
+ {
+ if (lock != null)
+ lock.unlock();
+ }
}
public AbstractReplicationStrategy getReplicationStrategy()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 3d49ca6..ace114b 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -54,6 +54,8 @@ public class Mutation implements IMutation
// map of column family id to mutations for that column family.
private final Map<UUID, PartitionUpdate> modifications;
+ // Time at which this mutation was instantiated
+ public final long createdAt = System.currentTimeMillis();
public Mutation(String keyspaceName, DecoratedKey key)
{
this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 3baa93e..640e45f 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -22,6 +22,7 @@ import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.*;
import org.apache.cassandra.tracing.Tracing;
@@ -47,10 +48,17 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
replyTo = InetAddress.getByAddress(from);
}
+ try
+ {
message.payload.apply();
WriteResponse response = new WriteResponse();
Tracing.trace("Enqueuing response to {}", replyTo);
MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
+ }
+ catch (WriteTimeoutException wto)
+ {
+ Tracing.trace("Payload application resulted in WriteTimeout, not replying");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 7bfd552..7ea946b 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -101,6 +101,8 @@ public final class SystemKeyspace
public static final String SSTABLE_ACTIVITY = "sstable_activity";
public static final String SIZE_ESTIMATES = "size_estimates";
public static final String AVAILABLE_RANGES = "available_ranges";
+ public static final String MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS = "materializedviews_builds_in_progress";
+ public static final String BUILT_MATERIALIZEDVIEWS = "built_materializedviews";
@Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces";
@Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies";
@@ -261,6 +263,24 @@ public final class SystemKeyspace
+ "ranges set<blob>,"
+ "PRIMARY KEY ((keyspace_name)))");
+ public static final CFMetaData MaterializedViewsBuildsInProgress =
+ compile(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS,
+ "materialized views builds current progress",
+ "CREATE TABLE %s ("
+ + "keyspace_name text,"
+ + "view_name text,"
+ + "last_token varchar,"
+ + "generation_number int,"
+ + "PRIMARY KEY ((keyspace_name), view_name))");
+
+ public static final CFMetaData BuiltMaterializedViews =
+ compile(BUILT_MATERIALIZEDVIEWS,
+ "built materialized views",
+ "CREATE TABLE \"%s\" ("
+ + "keyspace_name text,"
+ + "view_name text,"
+ + "PRIMARY KEY ((keyspace_name), view_name))");
+
@Deprecated
public static final CFMetaData LegacyKeyspaces =
compile(LEGACY_KEYSPACES,
@@ -401,6 +421,8 @@ public final class SystemKeyspace
SSTableActivity,
SizeEstimates,
AvailableRanges,
+ MaterializedViewsBuildsInProgress,
+ BuiltMaterializedViews,
LegacyKeyspaces,
LegacyColumnfamilies,
LegacyColumns,
@@ -493,6 +515,82 @@ public final class SystemKeyspace
return CompactionHistoryTabularData.from(queryResultSet);
}
+ public static boolean isViewBuilt(String keyspaceName, String viewName)
+ {
+ String req = "SELECT view_name FROM %s.\"%s\" WHERE keyspace_name=? AND view_name=?";
+ UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName);
+ return !result.isEmpty();
+ }
+
+ public static void setMaterializedViewBuilt(String keyspaceName, String viewName)
+ {
+ String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name) VALUES (?, ?)";
+ executeInternal(String.format(req, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName);
+ forceBlockingFlush(BUILT_MATERIALIZEDVIEWS);
+ }
+
+
+ public static void setMaterializedViewRemoved(String keyspaceName, String viewName)
+ {
+ String buildReq = "DELETE FROM %S.%s WHERE keyspace_name = ? AND view_name = ?";
+ executeInternal(String.format(buildReq, NAME, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName);
+ forceBlockingFlush(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS);
+
+ String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND view_name = ?";
+ executeInternal(String.format(builtReq, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName);
+ forceBlockingFlush(BUILT_MATERIALIZEDVIEWS);
+ }
+
+ public static void beginMaterializedViewBuild(String ksname, String viewName, int generationNumber)
+ {
+ executeInternal(String.format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS),
+ ksname,
+ viewName,
+ generationNumber);
+ }
+
+ public static void finishMaterializedViewBuildStatus(String ksname, String viewName)
+ {
+ // We flush the view built first, because if we fail now, we'll restart at the last place we checkpointed
+ // materialized view build.
+ // If we flush the delete first, we'll have to restart from the beginning.
+ // Also, if the build succeeded, but the materialized view build failed, we will be able to skip the
+ // materialized view build check next boot.
+ setMaterializedViewBuilt(ksname, viewName);
+ forceBlockingFlush(BUILT_MATERIALIZEDVIEWS);
+ executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
+ forceBlockingFlush(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS);
+ }
+
+ public static void updateMaterializedViewBuildStatus(String ksname, String viewName, Token token)
+ {
+ String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)";
+ Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+ executeInternal(String.format(req, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token));
+ }
+
+ public static Pair<Integer, Token> getMaterializedViewBuildStatus(String ksname, String viewName)
+ {
+ String req = "SELECT generation_number, last_token FROM system.%s WHERE keyspace_name = ? AND view_name = ?";
+ UntypedResultSet queryResultSet = executeInternal(String.format(req, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
+ if (queryResultSet == null || queryResultSet.isEmpty())
+ return null;
+
+ UntypedResultSet.Row row = queryResultSet.one();
+
+ Integer generation = null;
+ Token lastKey = null;
+ if (row.has("generation_number"))
+ generation = row.getInt("generation_number");
+ if (row.has("last_key"))
+ {
+ Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
+ lastKey = factory.fromString(row.getString("last_key"));
+ }
+
+ return Pair.create(generation, lastKey);
+ }
+
public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/WriteType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteType.java b/src/java/org/apache/cassandra/db/WriteType.java
index 4f4c88d..20fb6a9 100644
--- a/src/java/org/apache/cassandra/db/WriteType.java
+++ b/src/java/org/apache/cassandra/db/WriteType.java
@@ -24,5 +24,6 @@ public enum WriteType
UNLOGGED_BATCH,
COUNTER,
BATCH_LOG,
- CAS;
+ CAS,
+ MATERIALIZED_VIEW;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index bf412d8..3dd6f38 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.index.SecondaryIndexBuilder;
+import org.apache.cassandra.db.view.MaterializedViewBuilder;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
@@ -1365,6 +1366,31 @@ public class CompactionManager implements CompactionManagerMBean
}
}
+ public Future<?> submitMaterializedViewBuilder(final MaterializedViewBuilder builder)
+ {
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ metrics.beginCompaction(builder);
+ try
+ {
+ builder.run();
+ }
+ finally
+ {
+ metrics.finishCompaction(builder);
+ }
+ }
+ };
+ if (executor.isShutdown())
+ {
+ logger.info("Compaction executor has shut down, not submitting index build");
+ return null;
+ }
+
+ return executor.submit(runnable);
+ }
public int getActiveCompactions()
{
return CompactionMetrics.getCompactions().size();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 766eb1b..5e15f33 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -426,7 +426,7 @@ public class CompactionStrategyManager implements INotificationConsumer
return tasks;
}
}
- }, false);
+ }, false, false);
}
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 5b6ce05..f8f016c 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -35,7 +35,8 @@ public enum OperationType
VERIFY("Verify"),
FLUSH("Flush"),
STREAM("Stream"),
- WRITE("Write");
+ WRITE("Write"),
+ VIEW_BUILD("Materialized view build");
public final String type;
public final String fileName;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
new file mode 100644
index 0000000..082c71d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.view;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.MaterializedViewDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.statements.CFProperties;
+import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder;
+import org.apache.cassandra.db.CBuilder;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeBackedRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.service.pager.QueryPager;
+
+/**
+ * A Materialized View copies data from a base table into a view table which can be queried independently from the
+ * base. Every update which targets the base table must be fed through the {@link MaterializedViewManager} to ensure
+ * that if a view needs to be updated, the updates are properly created and fed into the view.
+ *
+ * This class does the job of translating the base row to the view row.
+ *
+ * It handles reading existing state and figuring out what tombstones need to be generated.
+ *
+ * createMutations below is the "main method"
+ *
+ */
+public class MaterializedView
+{
+ /**
+ * The columns should all be updated together, so we use this object as group.
+ */
+ private static class MVColumns
+ {
+ //These are the base column definitions in terms of the *views* partitioning.
+ //Meaning we can see (for example) the partition key of the view contains a clustering key
+ //from the base table.
+ public final List<ColumnDefinition> partitionDefs;
+ public final List<ColumnDefinition> primaryKeyDefs;
+ public final List<ColumnDefinition> baseComplexColumns;
+
+ private MVColumns(List<ColumnDefinition> partitionDefs, List<ColumnDefinition> primaryKeyDefs, List<ColumnDefinition> baseComplexColumns)
+ {
+ this.partitionDefs = partitionDefs;
+ this.primaryKeyDefs = primaryKeyDefs;
+ this.baseComplexColumns = baseComplexColumns;
+ }
+ }
+
+ public final String name;
+
+ private final ColumnFamilyStore baseCfs;
+ private ColumnFamilyStore _viewCfs = null;
+
+ private MVColumns columns;
+
+ private final boolean viewHasAllPrimaryKeys;
+ private final boolean includeAll;
+ private MaterializedViewBuilder builder;
+
+ public MaterializedView(MaterializedViewDefinition definition,
+ ColumnFamilyStore baseCfs)
+ {
+ this.baseCfs = baseCfs;
+
+ name = definition.viewName;
+ includeAll = definition.includeAll;
+
+ viewHasAllPrimaryKeys = updateDefinition(definition);
+ }
+
+ /**
+ * Lazily fetch the CFS instance for the view.
+ * We do this lazily to avoid initilization issues.
+ *
+ * @return The views CFS instance
+ */
+ public ColumnFamilyStore getViewCfs()
+ {
+ if (_viewCfs == null)
+ _viewCfs = Keyspace.openAndGetStore(Schema.instance.getCFMetaData(baseCfs.keyspace.getName(), name));
+
+ return _viewCfs;
+ }
+
+
+ /**
+ * Lookup column definitions in the base table that correspond to the view columns (should be 1:1)
+ *
+ * Notify caller if all primary keys in the view are ALL primary keys in the base. We do this to simplify
+ * tombstone checks.
+ *
+ * @param columns a list of columns to lookup in the base table
+ * @param definitions lists to populate for the base table definitions
+ * @return true if all view PKs are also Base PKs
+ */
+ private boolean resolveAndAddColumns(Iterable<ColumnIdentifier> columns, List<ColumnDefinition>... definitions)
+ {
+ boolean allArePrimaryKeys = true;
+ for (ColumnIdentifier identifier : columns)
+ {
+ ColumnDefinition cdef = baseCfs.metadata.getColumnDefinition(identifier);
+ assert cdef != null : "Could not resolve column " + identifier.toString();
+
+ for (List<ColumnDefinition> list : definitions)
+ {
+ list.add(cdef);
+ }
+
+ allArePrimaryKeys = allArePrimaryKeys && cdef.isPrimaryKeyColumn();
+ }
+
+ return allArePrimaryKeys;
+ }
+
+ /**
+ * This updates the columns stored which are dependent on the base CFMetaData.
+ *
+ * @return true if the view contains only columns which are part of the base's primary key; false if there is at
+ * least one column which is not.
+ */
+ public boolean updateDefinition(MaterializedViewDefinition definition)
+ {
+ List<ColumnDefinition> partitionDefs = new ArrayList<>(definition.partitionColumns.size());
+ List<ColumnDefinition> primaryKeyDefs = new ArrayList<>(definition.partitionColumns.size()
+ + definition.clusteringColumns.size());
+ List<ColumnDefinition> baseComplexColumns = new ArrayList<>();
+
+ // We only add the partition columns to the partitions list, but both partition columns and clustering
+ // columns are added to the primary keys list
+ boolean partitionAllPrimaryKeyColumns = resolveAndAddColumns(definition.partitionColumns, primaryKeyDefs, partitionDefs);
+ boolean clusteringAllPrimaryKeyColumns = resolveAndAddColumns(definition.clusteringColumns, primaryKeyDefs);
+
+ for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
+ {
+ if (cdef.isComplex())
+ {
+ baseComplexColumns.add(cdef);
+ }
+ }
+
+ this.columns = new MVColumns(partitionDefs, primaryKeyDefs, baseComplexColumns);
+
+ return partitionAllPrimaryKeyColumns && clusteringAllPrimaryKeyColumns;
+ }
+
+ /**
+ * Check to see if the update could possibly modify a view. Cases where the view may be updated are:
+ * <ul>
+ * <li>View selects all columns</li>
+ * <li>Update contains any range tombstones</li>
+ * <li>Update touches one of the columns included in the view</li>
+ * </ul>
+ *
+ * If the update contains any range tombstones, there is a possibility that it will not touch a range that is
+ * currently included in the view.
+ *
+ * @return true if {@param partition} modifies a column included in the view
+ */
+ public boolean updateAffectsView(AbstractThreadUnsafePartition partition)
+ {
+ // If we are including all of the columns, then any update will be included
+ if (includeAll)
+ return true;
+
+ // If there are range tombstones, tombstones will also need to be generated for the materialized view
+ // This requires a query of the base rows and generating tombstones for all of those values
+ if (!partition.deletionInfo().isLive())
+ return true;
+
+ // Check whether the update touches any of the columns included in the view
+ for (Row row : partition)
+ {
+ for (ColumnData data : row)
+ {
+ if (getViewCfs().metadata.getColumnDefinition(data.column().name) != null)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Creates the clustering columns for the view based on the specified row and resolver policy
+ *
+ * @param temporalRow The current row
+ * @param resolver The policy to use when selecting versions of cells use
+ * @return The clustering object to use for the view
+ */
+ private Clustering viewClustering(TemporalRow temporalRow, TemporalRow.Resolver resolver)
+ {
+ CFMetaData viewCfm = getViewCfs().metadata;
+ int numViewClustering = viewCfm.clusteringColumns().size();
+ CBuilder clustering = CBuilder.create(getViewCfs().getComparator());
+ for (int i = 0; i < numViewClustering; i++)
+ {
+ ColumnDefinition definition = viewCfm.clusteringColumns().get(i);
+ clustering.add(temporalRow.clusteringValue(definition, resolver));
+ }
+
+ return clustering.build();
+ }
+
+ /**
+ * @return Mutation containing a range tombstone for a base partition key and TemporalRow.
+ */
+ private PartitionUpdate createTombstone(TemporalRow temporalRow,
+ DecoratedKey partitionKey,
+ DeletionTime deletionTime,
+ TemporalRow.Resolver resolver,
+ int nowInSec)
+ {
+ CFMetaData viewCfm = getViewCfs().metadata;
+ Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
+ builder.newRow(viewClustering(temporalRow, resolver));
+ builder.addRowDeletion(deletionTime);
+ return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
+ }
+
+ /**
+ * @return PartitionUpdate containing a complex tombstone for a TemporalRow, and the collection's column identifier.
+ */
+ private PartitionUpdate createComplexTombstone(TemporalRow temporalRow,
+ DecoratedKey partitionKey,
+ ColumnDefinition deletedColumn,
+ DeletionTime deletionTime,
+ TemporalRow.Resolver resolver,
+ int nowInSec)
+ {
+
+ CFMetaData viewCfm = getViewCfs().metadata;
+ Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
+ builder.newRow(viewClustering(temporalRow, resolver));
+ builder.addComplexDeletion(deletedColumn, deletionTime);
+ return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
+ }
+
+ /**
+ * @return View's DecoratedKey or null, if one of the view's primary key components has an invalid resolution from
+ * the TemporalRow and its Resolver
+ */
+ private DecoratedKey viewPartitionKey(TemporalRow temporalRow, TemporalRow.Resolver resolver)
+ {
+ List<ColumnDefinition> partitionDefs = this.columns.partitionDefs;
+ Object[] partitionKey = new Object[partitionDefs.size()];
+
+ for (int i = 0; i < partitionKey.length; i++)
+ {
+ ByteBuffer value = temporalRow.clusteringValue(partitionDefs.get(i), resolver);
+
+ if (value == null)
+ return null;
+
+ partitionKey[i] = value;
+ }
+
+ return getViewCfs().partitioner.decorateKey(CFMetaData.serializePartitionKey(getViewCfs().metadata
+ .getKeyValidatorAsClusteringComparator()
+ .make(partitionKey)));
+ }
+
+ /**
+ * @return mutation which contains the tombstone for the referenced TemporalRow, or null if not necessary.
+ * TemporalRow's can reference at most one view row; there will be at most one row to be tombstoned, so only one
+ * mutation is necessary
+ */
+ private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow)
+ {
+ // Primary Key and Clustering columns do not generate tombstones
+ if (viewHasAllPrimaryKeys)
+ return null;
+
+ boolean hasUpdate = false;
+ List<ColumnDefinition> primaryKeyDefs = this.columns.primaryKeyDefs;
+ for (ColumnDefinition viewPartitionKeys : primaryKeyDefs)
+ {
+ if (!viewPartitionKeys.isPrimaryKeyColumn() && temporalRow.clusteringValue(viewPartitionKeys, TemporalRow.oldValueIfUpdated) != null)
+ hasUpdate = true;
+ }
+
+ if (!hasUpdate)
+ return null;
+
+ TemporalRow.Resolver resolver = TemporalRow.earliest;
+ return createTombstone(temporalRow,
+ viewPartitionKey(temporalRow, resolver),
+ new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec),
+ resolver,
+ temporalRow.nowInSec);
+ }
+
+ /**
+ * @return Mutation which is the transformed base table mutation for the materialized view.
+ */
+ private PartitionUpdate createUpdatesForInserts(TemporalRow temporalRow)
+ {
+ TemporalRow.Resolver resolver = TemporalRow.latest;
+
+ DecoratedKey partitionKey = viewPartitionKey(temporalRow, resolver);
+ ColumnFamilyStore viewCfs = getViewCfs();
+
+ if (partitionKey == null)
+ {
+ // Not having a partition key means we aren't updating anything
+ return null;
+ }
+
+ Row.Builder regularBuilder = BTreeBackedRow.unsortedBuilder(viewCfs.metadata.partitionColumns().regulars, temporalRow.nowInSec);
+
+ CBuilder clustering = CBuilder.create(viewCfs.getComparator());
+ for (int i = 0; i < viewCfs.metadata.clusteringColumns().size(); i++)
+ {
+ clustering.add(temporalRow.clusteringValue(viewCfs.metadata.clusteringColumns().get(i), resolver));
+ }
+ regularBuilder.newRow(clustering.build());
+ regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(viewCfs.metadata,
+ temporalRow.viewClusteringTimestamp(),
+ temporalRow.viewClusteringTtl(),
+ temporalRow.viewClusteringLocalDeletionTime()));
+
+ for (ColumnDefinition columnDefinition : viewCfs.metadata.allColumns())
+ {
+ if (columnDefinition.isPrimaryKeyColumn())
+ continue;
+
+ for (Cell cell : temporalRow.values(columnDefinition, resolver))
+ {
+ regularBuilder.addCell(cell);
+ }
+ }
+
+ return PartitionUpdate.singleRowUpdate(viewCfs.metadata, partitionKey, regularBuilder.build());
+ }
+
+ /**
+ * @param partition Update which possibly contains deletion info for which to generate view tombstones.
+ * @return View Tombstones which delete all of the rows which have been removed from the base table with
+ * {@param partition}
+ */
+ private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractThreadUnsafePartition partition)
+ {
+ final TemporalRow.Resolver resolver = TemporalRow.earliest;
+
+ DeletionInfo deletionInfo = partition.deletionInfo();
+
+ List<Mutation> mutations = new ArrayList<>();
+
+ // Check the complex columns to see if there are any which may have tombstones we need to create for the view
+ if (!columns.baseComplexColumns.isEmpty())
+ {
+ for (Row row : partition)
+ {
+ if (!row.hasComplexDeletion())
+ continue;
+
+ TemporalRow temporalRow = rowSet.getClustering(row.clustering());
+
+ assert temporalRow != null;
+
+ for (ColumnDefinition definition : columns.baseComplexColumns)
+ {
+ ComplexColumnData columnData = row.getComplexColumnData(definition);
+
+ if (columnData != null)
+ {
+ DeletionTime time = columnData.complexDeletion();
+ if (!time.isLive())
+ {
+ DecoratedKey targetKey = viewPartitionKey(temporalRow, resolver);
+ if (targetKey != null)
+ mutations.add(new Mutation(createComplexTombstone(temporalRow, targetKey, definition, time, resolver, temporalRow.nowInSec)));
+ }
+ }
+ }
+ }
+ }
+
+ ReadCommand command = null;
+
+ if (!deletionInfo.isLive())
+ {
+ // We have to generate tombstones for all of the affected rows, but we don't have the information in order
+ // to create them. This requires that we perform a read for the entire range that is being tombstoned, and
+ // generate a tombstone for each. This may be slow, because a single range tombstone can cover up to an
+ // entire partition of data which is not distributed on a single partition node.
+ DecoratedKey dk = rowSet.dk;
+
+ if (deletionInfo.hasRanges())
+ {
+ SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, dk);
+ Iterator<RangeTombstone> tombstones = deletionInfo.rangeIterator(false);
+ while (tombstones.hasNext())
+ {
+ RangeTombstone tombstone = tombstones.next();
+
+ builder.addSlice(tombstone.deletedSlice());
+ }
+
+ command = builder.build();
+ }
+ else
+ {
+ command = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, rowSet.nowInSec, dk);
+ }
+ }
+
+ if (command == null)
+ {
+ SinglePartitionSliceBuilder builder = null;
+ for (Row row : partition)
+ {
+ if (!row.deletion().isLive())
+ {
+ if (builder == null)
+ builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
+ builder.addSlice(Slice.make(row.clustering()));
+ }
+ }
+
+ if (builder != null)
+ command = builder.build();
+ }
+
+ if (command != null)
+ {
+ QueryPager pager = command.getPager(null);
+
+ // Add all of the rows which were recovered from the query to the row set
+ while (!pager.isExhausted())
+ {
+ try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+ PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
+ {
+ if (!iter.hasNext())
+ break;
+
+ try (RowIterator rowIterator = iter.next())
+ {
+ while (rowIterator.hasNext())
+ {
+ Row row = rowIterator.next();
+ rowSet.addRow(row, false);
+ }
+ }
+ }
+ }
+
+ // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone
+ // for the view.
+ for (TemporalRow temporalRow : rowSet)
+ {
+ DeletionTime deletionTime = temporalRow.deletionTime(partition);
+ if (!deletionTime.isLive())
+ {
+ DecoratedKey value = viewPartitionKey(temporalRow, resolver);
+ if (value != null)
+ {
+ PartitionUpdate update = createTombstone(temporalRow, value, deletionTime, resolver, temporalRow.nowInSec);
+ if (update != null)
+ mutations.add(new Mutation(update));
+ }
+ }
+ }
+ }
+
+ return !mutations.isEmpty() ? mutations : null;
+ }
+
+ /**
+ * Read and update temporal rows in the set which have corresponding values stored on the local node
+ */
+ private void readLocalRows(TemporalRow.Set rowSet)
+ {
+ SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
+
+ for (TemporalRow temporalRow : rowSet)
+ builder.addSlice(temporalRow.baseSlice());
+
+ QueryPager pager = builder.build().getPager(null);
+
+ while (!pager.isExhausted())
+ {
+ try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+ PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
+ {
+ while (iter.hasNext())
+ {
+ try (RowIterator rows = iter.next())
+ {
+ while (rows.hasNext())
+ {
+ rowSet.addRow(rows.next(), false);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @return Set of rows which are contained in the partition update {@param partition}
+ */
+ private TemporalRow.Set separateRows(ByteBuffer key, AbstractThreadUnsafePartition partition)
+ {
+ Set<ColumnIdentifier> columns = new HashSet<>();
+ for (ColumnDefinition def : this.columns.primaryKeyDefs)
+ columns.add(def.name);
+
+ TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, columns, key);
+ for (Row row : partition)
+ rowSet.addRow(row, true);
+
+ return rowSet;
+ }
+
+ /**
+ * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
+ * since all of the update will already be present in the base table.
+ * @return View mutations which represent the changes necessary as long as previously created mutations for the view
+ * have been applied successfully. This is based solely on the changes that are necessary given the current
+ * state of the base table and the newly applying partition data.
+ */
+ public Collection<Mutation> createMutations(ByteBuffer key, AbstractThreadUnsafePartition partition, boolean isBuilding)
+ {
+ if (!updateAffectsView(partition))
+ return null;
+
+ TemporalRow.Set rowSet = separateRows(key, partition);
+
+ // If we are building the view, we do not want to add old values; they will always be the same
+ if (!isBuilding)
+ readLocalRows(rowSet);
+
+ Collection<Mutation> mutations = null;
+ for (TemporalRow temporalRow : rowSet)
+ {
+ // If we are building, there is no need to check for partition tombstones; those values will not be present
+ // in the partition data
+ if (!isBuilding)
+ {
+ PartitionUpdate partitionTombstone = createRangeTombstoneForRow(temporalRow);
+ if (partitionTombstone != null)
+ {
+ if (mutations == null) mutations = new LinkedList<>();
+ mutations.add(new Mutation(partitionTombstone));
+ }
+ }
+
+ PartitionUpdate insert = createUpdatesForInserts(temporalRow);
+ if (insert != null)
+ {
+ if (mutations == null) mutations = new LinkedList<>();
+ mutations.add(new Mutation(insert));
+ }
+ }
+
+ if (!isBuilding)
+ {
+ Collection<Mutation> deletion = createForDeletionInfo(rowSet, partition);
+ if (deletion != null && !deletion.isEmpty())
+ {
+ if (mutations == null) mutations = new LinkedList<>();
+ mutations.addAll(deletion);
+ }
+ }
+
+ return mutations;
+ }
+
+ public synchronized void build()
+ {
+ if (this.builder != null)
+ {
+ this.builder.stop();
+ this.builder = null;
+ }
+
+ this.builder = new MaterializedViewBuilder(baseCfs, this);
+ CompactionManager.instance.submitMaterializedViewBuilder(builder);
+ }
+
+ /**
+ * @return CFMetaData which represents the definition given
+ */
+ public static CFMetaData getCFMetaData(MaterializedViewDefinition definition,
+ CFMetaData baseCf,
+ CFProperties properties)
+ {
+ CFMetaData.Builder viewBuilder = CFMetaData.Builder
+ .createView(baseCf.ksName, definition.viewName);
+
+ ColumnDefinition nonPkTarget = null;
+
+ for (ColumnIdentifier targetIdentifier : definition.partitionColumns)
+ {
+ ColumnDefinition target = baseCf.getColumnDefinition(targetIdentifier);
+ if (!target.isPartitionKey())
+ nonPkTarget = target;
+
+ viewBuilder.addPartitionKey(target.name, properties.getReversableType(targetIdentifier, target.type));
+ }
+
+ Collection<ColumnDefinition> included = new ArrayList<>();
+ for(ColumnIdentifier identifier : definition.included)
+ {
+ ColumnDefinition cfDef = baseCf.getColumnDefinition(identifier);
+ assert cfDef != null;
+ included.add(cfDef);
+ }
+
+ boolean includeAll = included.isEmpty();
+
+ for (ColumnIdentifier ident : definition.clusteringColumns)
+ {
+ ColumnDefinition column = baseCf.getColumnDefinition(ident);
+ viewBuilder.addClusteringColumn(ident, properties.getReversableType(ident, column.type));
+ }
+
+ for (ColumnDefinition column : baseCf.partitionColumns().regulars.columns)
+ {
+ if (column != nonPkTarget && (includeAll || included.contains(column)))
+ {
+ viewBuilder.addRegularColumn(column.name, column.type);
+ }
+ }
+
+ //Add any extra clustering columns
+ for (ColumnDefinition column : Iterables.concat(baseCf.partitionKeyColumns(), baseCf.clusteringColumns()))
+ {
+ if ( (!definition.partitionColumns.contains(column.name) && !definition.clusteringColumns.contains(column.name)) &&
+ (includeAll || included.contains(column)) )
+ {
+ viewBuilder.addRegularColumn(column.name, column.type);
+ }
+ }
+
+ CFMetaData cfm = viewBuilder.build();
+ properties.properties.applyToCFMetadata(cfm);
+
+ return cfm;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
new file mode 100644
index 0000000..e8842ed
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+public class MaterializedViewBuilder extends CompactionInfo.Holder
+{
+ private final ColumnFamilyStore baseCfs;
+ private final MaterializedView view;
+ private final UUID compactionId;
+ private volatile Token prevToken = null;
+
+ private static final Logger logger = LoggerFactory.getLogger(MaterializedViewBuilder.class);
+
+ private volatile boolean isStopped = false;
+
+ public MaterializedViewBuilder(ColumnFamilyStore baseCfs, MaterializedView view)
+ {
+ this.baseCfs = baseCfs;
+ this.view = view;
+ compactionId = UUIDGen.getTimeUUID();
+ }
+
+ private void buildKey(DecoratedKey key)
+ {
+ QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null);
+
+ while (!pager.isExhausted())
+ {
+ try (ReadOrderGroup orderGroup = pager.startOrderGroup();
+ PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup))
+ {
+ if (!partitionIterator.hasNext())
+ return;
+
+ try (RowIterator rowIterator = partitionIterator.next())
+ {
+ Collection<Mutation> mutations = view.createMutations(key.getKey(), FilteredPartition.create(rowIterator), true);
+
+ if (mutations != null)
+ {
+ try
+ {
+ StorageProxy.mutateMV(key.getKey(), mutations);
+ break;
+ }
+ catch (WriteTimeoutException ex)
+ {
+ NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES)
+ .warn("Encountered write timeout when building materialized view {}, the entries were stored in the batchlog and will be replayed at another time", view.name);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void run()
+ {
+ String ksname = baseCfs.metadata.ksName, viewName = view.name;
+
+ if (SystemKeyspace.isViewBuilt(ksname, viewName))
+ return;
+
+ Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
+ final Pair<Integer, Token> buildStatus = SystemKeyspace.getMaterializedViewBuildStatus(ksname, viewName);
+ Token lastToken;
+ Function<View, Iterable<SSTableReader>> function;
+ if (buildStatus == null)
+ {
+ baseCfs.forceBlockingFlush();
+ function = View.select(SSTableSet.CANONICAL);
+ int generation = Integer.MIN_VALUE;
+
+ try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
+ {
+ for (SSTableReader reader : temp)
+ {
+ generation = Math.max(reader.descriptor.generation, generation);
+ }
+ }
+
+ SystemKeyspace.beginMaterializedViewBuild(ksname, viewName, generation);
+ lastToken = null;
+ }
+ else
+ {
+ function = new Function<View, Iterable<SSTableReader>>()
+ {
+ @Nullable
+ public Iterable<SSTableReader> apply(View view)
+ {
+ Iterable<SSTableReader> readers = View.select(SSTableSet.CANONICAL).apply(view);
+ if (readers != null)
+ return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
+ return null;
+ }
+ };
+ lastToken = buildStatus.right;
+ }
+
+ prevToken = lastToken;
+ try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
+ ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
+ {
+ while (!isStopped && iter.hasNext())
+ {
+ DecoratedKey key = iter.next();
+ Token token = key.getToken();
+ if (lastToken == null || lastToken.compareTo(token) < 0)
+ {
+ for (Range<Token> range : ranges)
+ {
+ if (range.contains(token))
+ {
+ buildKey(key);
+
+ if (prevToken == null || prevToken.compareTo(token) != 0)
+ {
+ SystemKeyspace.updateMaterializedViewBuildStatus(ksname, viewName, key.getToken());
+ prevToken = token;
+ }
+ }
+ }
+ lastToken = null;
+ }
+ }
+
+ SystemKeyspace.finishMaterializedViewBuildStatus(ksname, viewName);
+
+ }
+ catch (Exception e)
+ {
+ final MaterializedViewBuilder builder = new MaterializedViewBuilder(baseCfs, view);
+ ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitMaterializedViewBuilder(builder),
+ 5,
+ TimeUnit.MINUTES);
+ logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
+ }
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ long rangesLeft = 0, rangesTotal = 0;
+ Token lastToken = prevToken;
+
+ // This approximation is not very accurate, but since we do not have a method which allows us to calculate the
+ // percentage of a range covered by a second range, this is the best approximation that we can calculate.
+ // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of
+ // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node
+ // has.
+ for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName()))
+ {
+ rangesLeft++;
+ rangesTotal++;
+ // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the
+ // end of the method.
+ if (lastToken == null || range.contains(lastToken))
+ rangesLeft = 0;
+ }
+ return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
+ }
+
+ public void stop()
+ {
+ isStopped = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c43775c/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
new file mode 100644
index 0000000..7f97728
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.Lock;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Striped;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.MaterializedViewDefinition;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * Manages {@link MaterializedView}'s for a single {@link ColumnFamilyStore}. All of the materialized views for that
+ * table are created when this manager is initialized.
+ *
+ * The main purposes of the manager are to provide a single location for updates to be vetted to see whether they update
+ * any views {@link MaterializedViewManager#updateAffectsView(PartitionUpdate)}, provide locks to prevent multiple
+ * updates from creating incoherent updates in the view {@link MaterializedViewManager#acquireLockFor(ByteBuffer)}, and
+ * to affect change on the view.
+ */
+public class MaterializedViewManager
+{
+ private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentWriters() * 1024);
+
+ private final ConcurrentNavigableMap<String, MaterializedView> viewsByName;
+
+ private final ColumnFamilyStore baseCfs;
+
+ public MaterializedViewManager(ColumnFamilyStore baseCfs)
+ {
+ this.viewsByName = new ConcurrentSkipListMap<>();
+
+ this.baseCfs = baseCfs;
+ }
+
+ public Iterable<MaterializedView> allViews()
+ {
+ return viewsByName.values();
+ }
+
+ public Iterable<ColumnFamilyStore> allViewsCfs()
+ {
+ List<ColumnFamilyStore> viewColumnFamilies = new ArrayList<>();
+ for (MaterializedView view : allViews())
+ viewColumnFamilies.add(view.getViewCfs());
+ return viewColumnFamilies;
+ }
+
+ public void init()
+ {
+ reload();
+ }
+
+ public void invalidate()
+ {
+ for (MaterializedView view : allViews())
+ removeMaterializedView(view.name);
+ }
+
+ public void reload()
+ {
+ Map<String, MaterializedViewDefinition> newViewsByName = new HashMap<>();
+ for (MaterializedViewDefinition definition : baseCfs.metadata.getMaterializedViews())
+ {
+ newViewsByName.put(definition.viewName, definition);
+ }
+
+ for (String viewName : viewsByName.keySet())
+ {
+ if (!newViewsByName.containsKey(viewName))
+ removeMaterializedView(viewName);
+ }
+
+ for (Map.Entry<String, MaterializedViewDefinition> entry : newViewsByName.entrySet())
+ {
+ if (!viewsByName.containsKey(entry.getKey()))
+ addMaterializedView(entry.getValue());
+ }
+
+ for (MaterializedView view : allViews())
+ {
+ view.build();
+ // We provide the new definition from the base metadata
+ view.updateDefinition(newViewsByName.get(view.name));
+ }
+ }
+
+ public void buildAllViews()
+ {
+ for (MaterializedView view : allViews())
+ view.build();
+ }
+
+ public void removeMaterializedView(String name)
+ {
+ MaterializedView view = viewsByName.remove(name);
+
+ if (view == null)
+ return;
+
+ SystemKeyspace.setMaterializedViewRemoved(baseCfs.metadata.ksName, view.name);
+ }
+
+ public void addMaterializedView(MaterializedViewDefinition definition)
+ {
+ MaterializedView view = new MaterializedView(definition, baseCfs);
+
+ viewsByName.put(definition.viewName, view);
+ }
+
+ /**
+ * Calculates and pushes updates to the views replicas. The replicas are determined by
+ * {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
+ */
+ public void pushViewReplicaUpdates(ByteBuffer key, PartitionUpdate update) throws UnavailableException, OverloadedException, WriteTimeoutException
+ {
+ // This happens when we are replaying from commitlog. In that case, we have already sent this commit off to the
+ // view node.
+ if (!StorageService.instance.isJoined()) return;
+
+ List<Mutation> mutations = null;
+ for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet())
+ {
+ Collection<Mutation> viewMutations = view.getValue().createMutations(key, update, false);
+ if (viewMutations != null && !viewMutations.isEmpty())
+ {
+ if (mutations == null)
+ mutations = Lists.newLinkedList();
+ mutations.addAll(viewMutations);
+ }
+ }
+ if (mutations != null)
+ {
+ StorageProxy.mutateMV(key, mutations);
+ }
+ }
+
+ public boolean updateAffectsView(PartitionUpdate upd)
+ {
+ for (MaterializedView view : allViews())
+ {
+ if (view.updateAffectsView(upd))
+ return true;
+ }
+ return false;
+ }
+
+ public static Lock acquireLockFor(ByteBuffer key)
+ {
+ Lock lock = LOCKS.get(key);
+
+ if (lock.tryLock())
+ return lock;
+
+ return null;
+ }
+
+ public static boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean ignoreRf1)
+ {
+ for (IMutation mutation : mutations)
+ {
+ for (PartitionUpdate cf : mutation.getPartitionUpdates())
+ {
+ Keyspace keyspace = Keyspace.open(cf.metadata().ksName);
+
+ if (ignoreRf1 && keyspace.getReplicationStrategy().getReplicationFactor() == 1)
+ continue;
+
+ MaterializedViewManager viewManager = keyspace.getColumnFamilyStore(cf.metadata().cfId).materializedViewManager;
+ if (viewManager.updateAffectsView(cf))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+
+ public void forceBlockingFlush()
+ {
+ for (ColumnFamilyStore viewCfs : allViewsCfs())
+ viewCfs.forceBlockingFlush();
+ }
+
+ public void dumpMemtables()
+ {
+ for (ColumnFamilyStore viewCfs : allViewsCfs())
+ viewCfs.dumpMemtable();
+ }
+
+ public void truncateBlocking(long truncatedAt)
+ {
+ for (ColumnFamilyStore viewCfs : allViewsCfs())
+ {
+ ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
+ SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter);
+ }
+ }
+}