You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2015/07/30 00:19:47 UTC
[5/6] cassandra git commit: Revert "Materialized Views"
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
deleted file mode 100644
index 2ddc6ca..0000000
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-public abstract class AbstractReadCommandBuilder
-{
- protected final ColumnFamilyStore cfs;
- protected int nowInSeconds;
-
- private int cqlLimit = -1;
- private int pagingLimit = -1;
- protected boolean reversed = false;
-
- protected Set<ColumnIdentifier> columns;
- protected final RowFilter filter = RowFilter.create();
-
- private Slice.Bound lowerClusteringBound;
- private Slice.Bound upperClusteringBound;
-
- private NavigableSet<Clustering> clusterings;
-
- // Use Util.cmd() instead of this ctor directly
- AbstractReadCommandBuilder(ColumnFamilyStore cfs)
- {
- this.cfs = cfs;
- this.nowInSeconds = FBUtilities.nowInSeconds();
- }
-
- public AbstractReadCommandBuilder withNowInSeconds(int nowInSec)
- {
- this.nowInSeconds = nowInSec;
- return this;
- }
-
- public AbstractReadCommandBuilder fromIncl(Object... values)
- {
- assert lowerClusteringBound == null && clusterings == null;
- this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, true, values);
- return this;
- }
-
- public AbstractReadCommandBuilder fromExcl(Object... values)
- {
- assert lowerClusteringBound == null && clusterings == null;
- this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, false, values);
- return this;
- }
-
- public AbstractReadCommandBuilder toIncl(Object... values)
- {
- assert upperClusteringBound == null && clusterings == null;
- this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, true, values);
- return this;
- }
-
- public AbstractReadCommandBuilder toExcl(Object... values)
- {
- assert upperClusteringBound == null && clusterings == null;
- this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, false, values);
- return this;
- }
-
- public AbstractReadCommandBuilder includeRow(Object... values)
- {
- assert lowerClusteringBound == null && upperClusteringBound == null;
-
- if (this.clusterings == null)
- this.clusterings = new TreeSet<>(cfs.metadata.comparator);
-
- this.clusterings.add(cfs.metadata.comparator.make(values));
- return this;
- }
-
- public AbstractReadCommandBuilder reverse()
- {
- this.reversed = true;
- return this;
- }
-
- public AbstractReadCommandBuilder withLimit(int newLimit)
- {
- this.cqlLimit = newLimit;
- return this;
- }
-
- public AbstractReadCommandBuilder withPagingLimit(int newLimit)
- {
- this.pagingLimit = newLimit;
- return this;
- }
-
- public AbstractReadCommandBuilder columns(String... columns)
- {
- if (this.columns == null)
- this.columns = new HashSet<>();
-
- for (String column : columns)
- this.columns.add(ColumnIdentifier.getInterned(column, true));
- return this;
- }
-
- private ByteBuffer bb(Object value, AbstractType<?> type)
- {
- return value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)type).decompose(value);
- }
-
- private AbstractType<?> forValues(AbstractType<?> collectionType)
- {
- assert collectionType instanceof CollectionType;
- CollectionType ct = (CollectionType)collectionType;
- switch (ct.kind)
- {
- case LIST:
- case MAP:
- return ct.valueComparator();
- case SET:
- return ct.nameComparator();
- }
- throw new AssertionError();
- }
-
- private AbstractType<?> forKeys(AbstractType<?> collectionType)
- {
- assert collectionType instanceof CollectionType;
- CollectionType ct = (CollectionType)collectionType;
- switch (ct.kind)
- {
- case LIST:
- case MAP:
- return ct.nameComparator();
- }
- throw new AssertionError();
- }
-
- public AbstractReadCommandBuilder filterOn(String column, Operator op, Object value)
- {
- ColumnDefinition def = cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned(column, true));
- assert def != null;
-
- AbstractType<?> type = def.type;
- if (op == Operator.CONTAINS)
- type = forValues(type);
- else if (op == Operator.CONTAINS_KEY)
- type = forKeys(type);
-
- this.filter.add(def, op, bb(value, type));
- return this;
- }
-
- protected ColumnFilter makeColumnFilter()
- {
- if (columns == null || columns.isEmpty())
- return ColumnFilter.all(cfs.metadata);
-
- ColumnFilter.Builder filter = ColumnFilter.selectionBuilder();
- for (ColumnIdentifier column : columns)
- filter.add(cfs.metadata.getColumnDefinition(column));
- return filter.build();
- }
-
- protected ClusteringIndexFilter makeFilter()
- {
- if (clusterings != null)
- {
- return new ClusteringIndexNamesFilter(clusterings, reversed);
- }
- else
- {
- Slice slice = Slice.make(lowerClusteringBound == null ? Slice.Bound.BOTTOM : lowerClusteringBound,
- upperClusteringBound == null ? Slice.Bound.TOP : upperClusteringBound);
- return new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), reversed);
- }
- }
-
- protected DataLimits makeLimits()
- {
- DataLimits limits = cqlLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(cqlLimit);
- if (pagingLimit >= 0)
- limits = limits.forPaging(pagingLimit);
- return limits;
- }
-
- public abstract ReadCommand build();
-
- public static class SinglePartitionBuilder extends AbstractReadCommandBuilder
- {
- private final DecoratedKey partitionKey;
-
- public SinglePartitionBuilder(ColumnFamilyStore cfs, DecoratedKey key)
- {
- super(cfs);
- this.partitionKey = key;
- }
-
- @Override
- public ReadCommand build()
- {
- return SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter());
- }
- }
-
- public static class SinglePartitionSliceBuilder extends AbstractReadCommandBuilder
- {
- private final DecoratedKey partitionKey;
- private Slices.Builder sliceBuilder;
-
- public SinglePartitionSliceBuilder(ColumnFamilyStore cfs, DecoratedKey key)
- {
- super(cfs);
- this.partitionKey = key;
- sliceBuilder = new Slices.Builder(cfs.getComparator());
- }
-
- public SinglePartitionSliceBuilder addSlice(Slice slice)
- {
- sliceBuilder.add(slice);
- return this;
- }
-
- @Override
- protected ClusteringIndexFilter makeFilter()
- {
- return new ClusteringIndexSliceFilter(sliceBuilder.build(), reversed);
- }
-
- @Override
- public ReadCommand build()
- {
- return SinglePartitionSliceCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter());
- }
- }
-
- public static class PartitionRangeBuilder extends AbstractReadCommandBuilder
- {
- private DecoratedKey startKey;
- private boolean startInclusive;
- private DecoratedKey endKey;
- private boolean endInclusive;
-
- public PartitionRangeBuilder(ColumnFamilyStore cfs)
- {
- super(cfs);
- }
-
- public PartitionRangeBuilder fromKeyIncl(Object... values)
- {
- assert startKey == null;
- this.startInclusive = true;
- this.startKey = makeKey(cfs.metadata, values);
- return this;
- }
-
- public PartitionRangeBuilder fromKeyExcl(Object... values)
- {
- assert startKey == null;
- this.startInclusive = false;
- this.startKey = makeKey(cfs.metadata, values);
- return this;
- }
-
- public PartitionRangeBuilder toKeyIncl(Object... values)
- {
- assert endKey == null;
- this.endInclusive = true;
- this.endKey = makeKey(cfs.metadata, values);
- return this;
- }
-
- public PartitionRangeBuilder toKeyExcl(Object... values)
- {
- assert endKey == null;
- this.endInclusive = false;
- this.endKey = makeKey(cfs.metadata, values);
- return this;
- }
-
- @Override
- public ReadCommand build()
- {
- PartitionPosition start = startKey;
- if (start == null)
- {
- start = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
- startInclusive = false;
- }
- PartitionPosition end = endKey;
- if (end == null)
- {
- end = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
- endInclusive = true;
- }
-
- AbstractBounds<PartitionPosition> bounds;
- if (startInclusive && endInclusive)
- bounds = new Bounds<>(start, end);
- else if (startInclusive && !endInclusive)
- bounds = new IncludingExcludingBounds<>(start, end);
- else if (!startInclusive && endInclusive)
- bounds = new Range<>(start, end);
- else
- bounds = new ExcludingBounds<>(start, end);
-
- return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
- }
-
- static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)
- {
- if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey)
- return (DecoratedKey)partitionKey[0];
-
- ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
- return StorageService.getPartitioner().decorateKey(key);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 24da365..6ac132c 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -53,7 +53,6 @@ import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.view.MaterializedViewManager;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
@@ -160,7 +159,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private final AtomicInteger fileIndexGenerator = new AtomicInteger(0);
public final SecondaryIndexManager indexManager;
- public final MaterializedViewManager materializedViewManager;
/* These are locally held copies to be changed from the config during runtime */
private volatile DefaultInteger minCompactionThreshold;
@@ -197,7 +195,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
indexManager.reload();
- materializedViewManager.reload();
// If the CF comparator has changed, we need to change the memtable,
// because the old one still aliases the previous comparator.
if (data.getView().getCurrentMemtable().initialComparator != metadata.comparator)
@@ -334,7 +331,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
this.partitioner = partitioner;
this.directories = directories;
this.indexManager = new SecondaryIndexManager(this);
- this.materializedViewManager = new MaterializedViewManager(this);
this.metric = new TableMetrics(this);
fileIndexGenerator.set(generation);
sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
@@ -455,7 +451,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
SystemKeyspace.removeTruncationRecord(metadata.cfId);
data.dropSSTables();
indexManager.invalidate();
- materializedViewManager.invalidate();
invalidateCaches();
}
@@ -577,10 +572,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// must be called after all sstables are loaded since row cache merges all row versions
- public void init()
+ public void initRowCache()
{
- materializedViewManager.init();
-
if (!isRowCacheEnabled())
return;
@@ -1813,7 +1806,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
cfs.data.reset();
return null;
}
- }, true, false);
+ }, true);
}
}
@@ -1841,16 +1834,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// flush the CF being truncated before forcing the new segment
forceBlockingFlush();
- materializedViewManager.forceBlockingFlush();
-
// sleep a little to make sure that our truncatedAt comes after any sstable
// that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
}
else
{
- dumpMemtable();
- materializedViewManager.dumpMemtables();
+ // just nuke the memtable data w/o writing to disk first
+ synchronized (data)
+ {
+ final Flush flush = new Flush(true);
+ flushExecutor.execute(flush);
+ postFlushExecutor.submit(flush.postFlush);
+ }
}
Runnable truncateRunnable = new Runnable()
@@ -1870,32 +1866,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (SecondaryIndex index : indexManager.getIndexes())
index.truncateBlocking(truncatedAt);
- materializedViewManager.truncateBlocking(truncatedAt);
-
SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
logger.debug("cleaning out row cache");
invalidateCaches();
}
};
- runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true);
+ runWithCompactionsDisabled(Executors.callable(truncateRunnable), true);
logger.debug("truncate complete");
}
- /**
- * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable.
- */
- public void dumpMemtable()
- {
- synchronized (data)
- {
- final Flush flush = new Flush(true);
- flushExecutor.execute(flush);
- postFlushExecutor.submit(flush.postFlush);
- }
- }
-
- public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews)
+ public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
{
// synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
// and so we only run one major compaction at a time
@@ -1903,20 +1884,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
logger.debug("Cancelling in-progress compactions for {}", metadata.cfName);
- Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews
- ? Iterables.concat(concatWithIndexes(), materializedViewManager.allViewsCfs())
- : concatWithIndexes();
-
- for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
+ Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes();
+ for (ColumnFamilyStore cfs : selfWithIndexes)
cfs.getCompactionStrategyManager().pause();
try
{
// interrupt in-progress compactions
- CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, interruptValidation);
- CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs);
+ CompactionManager.instance.interruptCompactionForCFs(selfWithIndexes, interruptValidation);
+ CompactionManager.instance.waitForCessation(selfWithIndexes);
// doublecheck that we finished, instead of timing out
- for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
+ for (ColumnFamilyStore cfs : selfWithIndexes)
{
if (!cfs.getTracker().getCompacting().isEmpty())
{
@@ -1938,7 +1916,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
finally
{
- for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
+ for (ColumnFamilyStore cfs : selfWithIndexes)
cfs.getCompactionStrategyManager().resume();
}
}
@@ -1958,7 +1936,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
};
- return runWithCompactionsDisabled(callable, false, false);
+ return runWithCompactionsDisabled(callable, false);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 78b593b..f37ce66 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -23,15 +23,13 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.commitlog.CommitLog;
@@ -39,17 +37,14 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.view.MaterializedViewManager;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.SchemaKeyspace;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.metrics.KeyspaceMetrics;
@@ -75,10 +70,7 @@ public class Keyspace
}
private volatile KeyspaceMetadata metadata;
-
- //OpOrder is defined globally since we need to order writes across
- //Keyspaces in the case of MaterializedViews (batchlog of MV mutations)
- public static final OpOrder writeOrder = new OpOrder();
+ public final OpOrder writeOrder = new OpOrder();
/* ColumnFamilyStore per column family */
private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<>();
@@ -130,7 +122,7 @@ public class Keyspace
// keyspace has to be constructed and in the cache before cacheRow can be called
for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores())
- cfs.init();
+ cfs.initRowCache();
}
}
}
@@ -360,14 +352,10 @@ public class Keyspace
// CFS being created for the first time, either on server startup or new CF being added.
// We don't worry about races here; startup is safe, and adding multiple idential CFs
// simultaneously is a "don't do that" scenario.
- ColumnFamilyStore newCfs = ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables);
-
- ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, newCfs);
+ ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
// CFS mbean instantiation will error out before we hit this, but in case that changes...
if (oldCfs != null)
throw new IllegalStateException("added multiple mappings for cf id " + cfId);
-
- newCfs.init();
}
else
{
@@ -392,41 +380,11 @@ public class Keyspace
* @param writeCommitLog false to disable commitlog append entirely
* @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
*/
- public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes)
+ public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
throw new RuntimeException("Testing write failures");
- Lock lock = null;
- boolean requiresViewUpdate = updateIndexes && MaterializedViewManager.updatesAffectView(Collections.singleton(mutation), false);
-
- if (requiresViewUpdate)
- {
- lock = MaterializedViewManager.acquireLockFor(mutation.key().getKey());
-
- if (lock == null)
- {
- if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
- {
- logger.debug("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
- Tracing.trace("Could not acquire MV lock");
- throw new WriteTimeoutException(WriteType.MATERIALIZED_VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
- }
- else
- {
- //This MV update can't happen right now. so rather than keep this thread busy
- // we will re-apply ourself to the queue and try again later
- StageManager.getStage(Stage.MUTATION).execute(() -> {
- if (writeCommitLog)
- mutation.apply();
- else
- mutation.applyUnsafe();
- });
-
- return;
- }
- }
- }
int nowInSec = FBUtilities.nowInSeconds();
try (OpOrder.Group opGroup = writeOrder.start())
{
@@ -443,26 +401,10 @@ public class Keyspace
ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().cfId);
if (cfs == null)
{
- logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().cfId, upd.metadata().ksName, upd.metadata().cfName);
+ logger.error("Attempting to mutate non-existant table {}", upd.metadata().cfId);
continue;
}
- if (requiresViewUpdate)
- {
- try
- {
- Tracing.trace("Create materialized view mutations from replica");
- cfs.materializedViewManager.pushViewReplicaUpdates(upd.partitionKey().getKey(), upd);
- }
- catch (Exception e)
- {
- if (!(e instanceof WriteTimeoutException))
- logger.warn("Encountered exception when creating materialized view mutations", e);
-
- JVMStabilityInspector.inspectThrowable(e);
- }
- }
-
Tracing.trace("Adding to {} memtable", upd.metadata().cfName);
SecondaryIndexManager.Updater updater = updateIndexes
? cfs.indexManager.updaterFor(upd, opGroup, nowInSec)
@@ -470,11 +412,6 @@ public class Keyspace
cfs.apply(upd, updater, opGroup, replayPosition);
}
}
- finally
- {
- if (lock != null)
- lock.unlock();
- }
}
public AbstractReplicationStrategy getReplicationStrategy()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index ace114b..3d49ca6 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -54,8 +54,6 @@ public class Mutation implements IMutation
// map of column family id to mutations for that column family.
private final Map<UUID, PartitionUpdate> modifications;
- // Time at which this mutation was instantiated
- public final long createdAt = System.currentTimeMillis();
public Mutation(String keyspaceName, DecoratedKey key)
{
this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 640e45f..3baa93e 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -22,7 +22,6 @@ import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.*;
import org.apache.cassandra.tracing.Tracing;
@@ -48,17 +47,10 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
replyTo = InetAddress.getByAddress(from);
}
- try
- {
message.payload.apply();
WriteResponse response = new WriteResponse();
Tracing.trace("Enqueuing response to {}", replyTo);
MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
- }
- catch (WriteTimeoutException wto)
- {
- Tracing.trace("Payload application resulted in WriteTimeout, not replying");
- }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 7ea946b..7bfd552 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -101,8 +101,6 @@ public final class SystemKeyspace
public static final String SSTABLE_ACTIVITY = "sstable_activity";
public static final String SIZE_ESTIMATES = "size_estimates";
public static final String AVAILABLE_RANGES = "available_ranges";
- public static final String MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS = "materializedviews_builds_in_progress";
- public static final String BUILT_MATERIALIZEDVIEWS = "built_materializedviews";
@Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces";
@Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies";
@@ -263,24 +261,6 @@ public final class SystemKeyspace
+ "ranges set<blob>,"
+ "PRIMARY KEY ((keyspace_name)))");
- public static final CFMetaData MaterializedViewsBuildsInProgress =
- compile(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS,
- "materialized views builds current progress",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "view_name text,"
- + "last_token varchar,"
- + "generation_number int,"
- + "PRIMARY KEY ((keyspace_name), view_name))");
-
- public static final CFMetaData BuiltMaterializedViews =
- compile(BUILT_MATERIALIZEDVIEWS,
- "built materialized views",
- "CREATE TABLE \"%s\" ("
- + "keyspace_name text,"
- + "view_name text,"
- + "PRIMARY KEY ((keyspace_name), view_name))");
-
@Deprecated
public static final CFMetaData LegacyKeyspaces =
compile(LEGACY_KEYSPACES,
@@ -421,8 +401,6 @@ public final class SystemKeyspace
SSTableActivity,
SizeEstimates,
AvailableRanges,
- MaterializedViewsBuildsInProgress,
- BuiltMaterializedViews,
LegacyKeyspaces,
LegacyColumnfamilies,
LegacyColumns,
@@ -515,82 +493,6 @@ public final class SystemKeyspace
return CompactionHistoryTabularData.from(queryResultSet);
}
- public static boolean isViewBuilt(String keyspaceName, String viewName)
- {
- String req = "SELECT view_name FROM %s.\"%s\" WHERE keyspace_name=? AND view_name=?";
- UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName);
- return !result.isEmpty();
- }
-
- public static void setMaterializedViewBuilt(String keyspaceName, String viewName)
- {
- String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name) VALUES (?, ?)";
- executeInternal(String.format(req, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName);
- forceBlockingFlush(BUILT_MATERIALIZEDVIEWS);
- }
-
-
- public static void setMaterializedViewRemoved(String keyspaceName, String viewName)
- {
- String buildReq = "DELETE FROM %S.%s WHERE keyspace_name = ? AND view_name = ?";
- executeInternal(String.format(buildReq, NAME, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName);
- forceBlockingFlush(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS);
-
- String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND view_name = ?";
- executeInternal(String.format(builtReq, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName);
- forceBlockingFlush(BUILT_MATERIALIZEDVIEWS);
- }
-
- public static void beginMaterializedViewBuild(String ksname, String viewName, int generationNumber)
- {
- executeInternal(String.format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS),
- ksname,
- viewName,
- generationNumber);
- }
-
- public static void finishMaterializedViewBuildStatus(String ksname, String viewName)
- {
- // We flush the view built first, because if we fail now, we'll restart at the last place we checkpointed
- // materialized view build.
- // If we flush the delete first, we'll have to restart from the beginning.
- // Also, if the build succeeded, but the materialized view build failed, we will be able to skip the
- // materialized view build check next boot.
- setMaterializedViewBuilt(ksname, viewName);
- forceBlockingFlush(BUILT_MATERIALIZEDVIEWS);
- executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
- forceBlockingFlush(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS);
- }
-
- public static void updateMaterializedViewBuildStatus(String ksname, String viewName, Token token)
- {
- String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)";
- Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
- executeInternal(String.format(req, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token));
- }
-
- public static Pair<Integer, Token> getMaterializedViewBuildStatus(String ksname, String viewName)
- {
- String req = "SELECT generation_number, last_token FROM system.%s WHERE keyspace_name = ? AND view_name = ?";
- UntypedResultSet queryResultSet = executeInternal(String.format(req, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
- if (queryResultSet == null || queryResultSet.isEmpty())
- return null;
-
- UntypedResultSet.Row row = queryResultSet.one();
-
- Integer generation = null;
- Token lastKey = null;
- if (row.has("generation_number"))
- generation = row.getInt("generation_number");
- if (row.has("last_key"))
- {
- Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory();
- lastKey = factory.fromString(row.getString("last_key"));
- }
-
- return Pair.create(generation, lastKey);
- }
-
public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/WriteType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteType.java b/src/java/org/apache/cassandra/db/WriteType.java
index 20fb6a9..4f4c88d 100644
--- a/src/java/org/apache/cassandra/db/WriteType.java
+++ b/src/java/org/apache/cassandra/db/WriteType.java
@@ -24,6 +24,5 @@ public enum WriteType
UNLOGGED_BATCH,
COUNTER,
BATCH_LOG,
- CAS,
- MATERIALIZED_VIEW;
+ CAS;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3dd6f38..bf412d8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -58,7 +58,6 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.index.SecondaryIndexBuilder;
-import org.apache.cassandra.db.view.MaterializedViewBuilder;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
@@ -1366,31 +1365,6 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- public Future<?> submitMaterializedViewBuilder(final MaterializedViewBuilder builder)
- {
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- metrics.beginCompaction(builder);
- try
- {
- builder.run();
- }
- finally
- {
- metrics.finishCompaction(builder);
- }
- }
- };
- if (executor.isShutdown())
- {
- logger.info("Compaction executor has shut down, not submitting index build");
- return null;
- }
-
- return executor.submit(runnable);
- }
public int getActiveCompactions()
{
return CompactionMetrics.getCompactions().size();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 5e15f33..766eb1b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -426,7 +426,7 @@ public class CompactionStrategyManager implements INotificationConsumer
return tasks;
}
}
- }, false, false);
+ }, false);
}
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index f8f016c..5b6ce05 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -35,8 +35,7 @@ public enum OperationType
VERIFY("Verify"),
FLUSH("Flush"),
STREAM("Stream"),
- WRITE("Write"),
- VIEW_BUILD("Materialized view build");
+ WRITE("Write");
public final String type;
public final String fileName;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
deleted file mode 100644
index 082c71d..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ /dev/null
@@ -1,691 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.view;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.MaterializedViewDefinition;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.statements.CFProperties;
-import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder;
-import org.apache.cassandra.db.CBuilder;
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.LivenessInfo;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.ReadOrderGroup;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Slice;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.BTreeBackedRow;
-import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.db.rows.ColumnData;
-import org.apache.cassandra.db.rows.ComplexColumnData;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.service.pager.QueryPager;
-
-/**
- * A Materialized View copies data from a base table into a view table which can be queried independently from the
- * base. Every update which targets the base table must be fed through the {@link MaterializedViewManager} to ensure
- * that if a view needs to be updated, the updates are properly created and fed into the view.
- *
- * This class does the job of translating the base row to the view row.
- *
- * It handles reading existing state and figuring out what tombstones need to be generated.
- *
- * createMutations below is the "main method"
- *
- */
-public class MaterializedView
-{
- /**
- * The columns should all be updated together, so we use this object as group.
- */
- private static class MVColumns
- {
- //These are the base column definitions in terms of the *views* partitioning.
- //Meaning we can see (for example) the partition key of the view contains a clustering key
- //from the base table.
- public final List<ColumnDefinition> partitionDefs;
- public final List<ColumnDefinition> primaryKeyDefs;
- public final List<ColumnDefinition> baseComplexColumns;
-
- private MVColumns(List<ColumnDefinition> partitionDefs, List<ColumnDefinition> primaryKeyDefs, List<ColumnDefinition> baseComplexColumns)
- {
- this.partitionDefs = partitionDefs;
- this.primaryKeyDefs = primaryKeyDefs;
- this.baseComplexColumns = baseComplexColumns;
- }
- }
-
- public final String name;
-
- private final ColumnFamilyStore baseCfs;
- private ColumnFamilyStore _viewCfs = null;
-
- private MVColumns columns;
-
- private final boolean viewHasAllPrimaryKeys;
- private final boolean includeAll;
- private MaterializedViewBuilder builder;
-
- public MaterializedView(MaterializedViewDefinition definition,
- ColumnFamilyStore baseCfs)
- {
- this.baseCfs = baseCfs;
-
- name = definition.viewName;
- includeAll = definition.includeAll;
-
- viewHasAllPrimaryKeys = updateDefinition(definition);
- }
-
- /**
- * Lazily fetch the CFS instance for the view.
- * We do this lazily to avoid initilization issues.
- *
- * @return The views CFS instance
- */
- public ColumnFamilyStore getViewCfs()
- {
- if (_viewCfs == null)
- _viewCfs = Keyspace.openAndGetStore(Schema.instance.getCFMetaData(baseCfs.keyspace.getName(), name));
-
- return _viewCfs;
- }
-
-
- /**
- * Lookup column definitions in the base table that correspond to the view columns (should be 1:1)
- *
- * Notify caller if all primary keys in the view are ALL primary keys in the base. We do this to simplify
- * tombstone checks.
- *
- * @param columns a list of columns to lookup in the base table
- * @param definitions lists to populate for the base table definitions
- * @return true if all view PKs are also Base PKs
- */
- private boolean resolveAndAddColumns(Iterable<ColumnIdentifier> columns, List<ColumnDefinition>... definitions)
- {
- boolean allArePrimaryKeys = true;
- for (ColumnIdentifier identifier : columns)
- {
- ColumnDefinition cdef = baseCfs.metadata.getColumnDefinition(identifier);
- assert cdef != null : "Could not resolve column " + identifier.toString();
-
- for (List<ColumnDefinition> list : definitions)
- {
- list.add(cdef);
- }
-
- allArePrimaryKeys = allArePrimaryKeys && cdef.isPrimaryKeyColumn();
- }
-
- return allArePrimaryKeys;
- }
-
- /**
- * This updates the columns stored which are dependent on the base CFMetaData.
- *
- * @return true if the view contains only columns which are part of the base's primary key; false if there is at
- * least one column which is not.
- */
- public boolean updateDefinition(MaterializedViewDefinition definition)
- {
- List<ColumnDefinition> partitionDefs = new ArrayList<>(definition.partitionColumns.size());
- List<ColumnDefinition> primaryKeyDefs = new ArrayList<>(definition.partitionColumns.size()
- + definition.clusteringColumns.size());
- List<ColumnDefinition> baseComplexColumns = new ArrayList<>();
-
- // We only add the partition columns to the partitions list, but both partition columns and clustering
- // columns are added to the primary keys list
- boolean partitionAllPrimaryKeyColumns = resolveAndAddColumns(definition.partitionColumns, primaryKeyDefs, partitionDefs);
- boolean clusteringAllPrimaryKeyColumns = resolveAndAddColumns(definition.clusteringColumns, primaryKeyDefs);
-
- for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
- {
- if (cdef.isComplex())
- {
- baseComplexColumns.add(cdef);
- }
- }
-
- this.columns = new MVColumns(partitionDefs, primaryKeyDefs, baseComplexColumns);
-
- return partitionAllPrimaryKeyColumns && clusteringAllPrimaryKeyColumns;
- }
-
- /**
- * Check to see if the update could possibly modify a view. Cases where the view may be updated are:
- * <ul>
- * <li>View selects all columns</li>
- * <li>Update contains any range tombstones</li>
- * <li>Update touches one of the columns included in the view</li>
- * </ul>
- *
- * If the update contains any range tombstones, there is a possibility that it will not touch a range that is
- * currently included in the view.
- *
- * @return true if {@param partition} modifies a column included in the view
- */
- public boolean updateAffectsView(AbstractThreadUnsafePartition partition)
- {
- // If we are including all of the columns, then any update will be included
- if (includeAll)
- return true;
-
- // If there are range tombstones, tombstones will also need to be generated for the materialized view
- // This requires a query of the base rows and generating tombstones for all of those values
- if (!partition.deletionInfo().isLive())
- return true;
-
- // Check whether the update touches any of the columns included in the view
- for (Row row : partition)
- {
- for (ColumnData data : row)
- {
- if (getViewCfs().metadata.getColumnDefinition(data.column().name) != null)
- return true;
- }
- }
-
- return false;
- }
-
- /**
- * Creates the clustering columns for the view based on the specified row and resolver policy
- *
- * @param temporalRow The current row
- * @param resolver The policy to use when selecting versions of cells use
- * @return The clustering object to use for the view
- */
- private Clustering viewClustering(TemporalRow temporalRow, TemporalRow.Resolver resolver)
- {
- CFMetaData viewCfm = getViewCfs().metadata;
- int numViewClustering = viewCfm.clusteringColumns().size();
- CBuilder clustering = CBuilder.create(getViewCfs().getComparator());
- for (int i = 0; i < numViewClustering; i++)
- {
- ColumnDefinition definition = viewCfm.clusteringColumns().get(i);
- clustering.add(temporalRow.clusteringValue(definition, resolver));
- }
-
- return clustering.build();
- }
-
- /**
- * @return Mutation containing a range tombstone for a base partition key and TemporalRow.
- */
- private PartitionUpdate createTombstone(TemporalRow temporalRow,
- DecoratedKey partitionKey,
- DeletionTime deletionTime,
- TemporalRow.Resolver resolver,
- int nowInSec)
- {
- CFMetaData viewCfm = getViewCfs().metadata;
- Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
- builder.newRow(viewClustering(temporalRow, resolver));
- builder.addRowDeletion(deletionTime);
- return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
- }
-
- /**
- * @return PartitionUpdate containing a complex tombstone for a TemporalRow, and the collection's column identifier.
- */
- private PartitionUpdate createComplexTombstone(TemporalRow temporalRow,
- DecoratedKey partitionKey,
- ColumnDefinition deletedColumn,
- DeletionTime deletionTime,
- TemporalRow.Resolver resolver,
- int nowInSec)
- {
-
- CFMetaData viewCfm = getViewCfs().metadata;
- Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
- builder.newRow(viewClustering(temporalRow, resolver));
- builder.addComplexDeletion(deletedColumn, deletionTime);
- return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
- }
-
- /**
- * @return View's DecoratedKey or null, if one of the view's primary key components has an invalid resolution from
- * the TemporalRow and its Resolver
- */
- private DecoratedKey viewPartitionKey(TemporalRow temporalRow, TemporalRow.Resolver resolver)
- {
- List<ColumnDefinition> partitionDefs = this.columns.partitionDefs;
- Object[] partitionKey = new Object[partitionDefs.size()];
-
- for (int i = 0; i < partitionKey.length; i++)
- {
- ByteBuffer value = temporalRow.clusteringValue(partitionDefs.get(i), resolver);
-
- if (value == null)
- return null;
-
- partitionKey[i] = value;
- }
-
- return getViewCfs().partitioner.decorateKey(CFMetaData.serializePartitionKey(getViewCfs().metadata
- .getKeyValidatorAsClusteringComparator()
- .make(partitionKey)));
- }
-
- /**
- * @return mutation which contains the tombstone for the referenced TemporalRow, or null if not necessary.
- * TemporalRow's can reference at most one view row; there will be at most one row to be tombstoned, so only one
- * mutation is necessary
- */
- private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow)
- {
- // Primary Key and Clustering columns do not generate tombstones
- if (viewHasAllPrimaryKeys)
- return null;
-
- boolean hasUpdate = false;
- List<ColumnDefinition> primaryKeyDefs = this.columns.primaryKeyDefs;
- for (ColumnDefinition viewPartitionKeys : primaryKeyDefs)
- {
- if (!viewPartitionKeys.isPrimaryKeyColumn() && temporalRow.clusteringValue(viewPartitionKeys, TemporalRow.oldValueIfUpdated) != null)
- hasUpdate = true;
- }
-
- if (!hasUpdate)
- return null;
-
- TemporalRow.Resolver resolver = TemporalRow.earliest;
- return createTombstone(temporalRow,
- viewPartitionKey(temporalRow, resolver),
- new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec),
- resolver,
- temporalRow.nowInSec);
- }
-
- /**
- * @return Mutation which is the transformed base table mutation for the materialized view.
- */
- private PartitionUpdate createUpdatesForInserts(TemporalRow temporalRow)
- {
- TemporalRow.Resolver resolver = TemporalRow.latest;
-
- DecoratedKey partitionKey = viewPartitionKey(temporalRow, resolver);
- ColumnFamilyStore viewCfs = getViewCfs();
-
- if (partitionKey == null)
- {
- // Not having a partition key means we aren't updating anything
- return null;
- }
-
- Row.Builder regularBuilder = BTreeBackedRow.unsortedBuilder(viewCfs.metadata.partitionColumns().regulars, temporalRow.nowInSec);
-
- CBuilder clustering = CBuilder.create(viewCfs.getComparator());
- for (int i = 0; i < viewCfs.metadata.clusteringColumns().size(); i++)
- {
- clustering.add(temporalRow.clusteringValue(viewCfs.metadata.clusteringColumns().get(i), resolver));
- }
- regularBuilder.newRow(clustering.build());
- regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(viewCfs.metadata,
- temporalRow.viewClusteringTimestamp(),
- temporalRow.viewClusteringTtl(),
- temporalRow.viewClusteringLocalDeletionTime()));
-
- for (ColumnDefinition columnDefinition : viewCfs.metadata.allColumns())
- {
- if (columnDefinition.isPrimaryKeyColumn())
- continue;
-
- for (Cell cell : temporalRow.values(columnDefinition, resolver))
- {
- regularBuilder.addCell(cell);
- }
- }
-
- return PartitionUpdate.singleRowUpdate(viewCfs.metadata, partitionKey, regularBuilder.build());
- }
-
- /**
- * @param partition Update which possibly contains deletion info for which to generate view tombstones.
- * @return View Tombstones which delete all of the rows which have been removed from the base table with
- * {@param partition}
- */
- private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractThreadUnsafePartition partition)
- {
- final TemporalRow.Resolver resolver = TemporalRow.earliest;
-
- DeletionInfo deletionInfo = partition.deletionInfo();
-
- List<Mutation> mutations = new ArrayList<>();
-
- // Check the complex columns to see if there are any which may have tombstones we need to create for the view
- if (!columns.baseComplexColumns.isEmpty())
- {
- for (Row row : partition)
- {
- if (!row.hasComplexDeletion())
- continue;
-
- TemporalRow temporalRow = rowSet.getClustering(row.clustering());
-
- assert temporalRow != null;
-
- for (ColumnDefinition definition : columns.baseComplexColumns)
- {
- ComplexColumnData columnData = row.getComplexColumnData(definition);
-
- if (columnData != null)
- {
- DeletionTime time = columnData.complexDeletion();
- if (!time.isLive())
- {
- DecoratedKey targetKey = viewPartitionKey(temporalRow, resolver);
- if (targetKey != null)
- mutations.add(new Mutation(createComplexTombstone(temporalRow, targetKey, definition, time, resolver, temporalRow.nowInSec)));
- }
- }
- }
- }
- }
-
- ReadCommand command = null;
-
- if (!deletionInfo.isLive())
- {
- // We have to generate tombstones for all of the affected rows, but we don't have the information in order
- // to create them. This requires that we perform a read for the entire range that is being tombstoned, and
- // generate a tombstone for each. This may be slow, because a single range tombstone can cover up to an
- // entire partition of data which is not distributed on a single partition node.
- DecoratedKey dk = rowSet.dk;
-
- if (deletionInfo.hasRanges())
- {
- SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, dk);
- Iterator<RangeTombstone> tombstones = deletionInfo.rangeIterator(false);
- while (tombstones.hasNext())
- {
- RangeTombstone tombstone = tombstones.next();
-
- builder.addSlice(tombstone.deletedSlice());
- }
-
- command = builder.build();
- }
- else
- {
- command = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, rowSet.nowInSec, dk);
- }
- }
-
- if (command == null)
- {
- SinglePartitionSliceBuilder builder = null;
- for (Row row : partition)
- {
- if (!row.deletion().isLive())
- {
- if (builder == null)
- builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
- builder.addSlice(Slice.make(row.clustering()));
- }
- }
-
- if (builder != null)
- command = builder.build();
- }
-
- if (command != null)
- {
- QueryPager pager = command.getPager(null);
-
- // Add all of the rows which were recovered from the query to the row set
- while (!pager.isExhausted())
- {
- try (ReadOrderGroup orderGroup = pager.startOrderGroup();
- PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
- {
- if (!iter.hasNext())
- break;
-
- try (RowIterator rowIterator = iter.next())
- {
- while (rowIterator.hasNext())
- {
- Row row = rowIterator.next();
- rowSet.addRow(row, false);
- }
- }
- }
- }
-
- // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone
- // for the view.
- for (TemporalRow temporalRow : rowSet)
- {
- DeletionTime deletionTime = temporalRow.deletionTime(partition);
- if (!deletionTime.isLive())
- {
- DecoratedKey value = viewPartitionKey(temporalRow, resolver);
- if (value != null)
- {
- PartitionUpdate update = createTombstone(temporalRow, value, deletionTime, resolver, temporalRow.nowInSec);
- if (update != null)
- mutations.add(new Mutation(update));
- }
- }
- }
- }
-
- return !mutations.isEmpty() ? mutations : null;
- }
-
- /**
- * Read and update temporal rows in the set which have corresponding values stored on the local node
- */
- private void readLocalRows(TemporalRow.Set rowSet)
- {
- SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
-
- for (TemporalRow temporalRow : rowSet)
- builder.addSlice(temporalRow.baseSlice());
-
- QueryPager pager = builder.build().getPager(null);
-
- while (!pager.isExhausted())
- {
- try (ReadOrderGroup orderGroup = pager.startOrderGroup();
- PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
- {
- while (iter.hasNext())
- {
- try (RowIterator rows = iter.next())
- {
- while (rows.hasNext())
- {
- rowSet.addRow(rows.next(), false);
- }
- }
- }
- }
- }
- }
-
- /**
- * @return Set of rows which are contained in the partition update {@param partition}
- */
- private TemporalRow.Set separateRows(ByteBuffer key, AbstractThreadUnsafePartition partition)
- {
- Set<ColumnIdentifier> columns = new HashSet<>();
- for (ColumnDefinition def : this.columns.primaryKeyDefs)
- columns.add(def.name);
-
- TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, columns, key);
- for (Row row : partition)
- rowSet.addRow(row, true);
-
- return rowSet;
- }
-
- /**
- * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
- * since all of the update will already be present in the base table.
- * @return View mutations which represent the changes necessary as long as previously created mutations for the view
- * have been applied successfully. This is based solely on the changes that are necessary given the current
- * state of the base table and the newly applying partition data.
- */
- public Collection<Mutation> createMutations(ByteBuffer key, AbstractThreadUnsafePartition partition, boolean isBuilding)
- {
- if (!updateAffectsView(partition))
- return null;
-
- TemporalRow.Set rowSet = separateRows(key, partition);
-
- // If we are building the view, we do not want to add old values; they will always be the same
- if (!isBuilding)
- readLocalRows(rowSet);
-
- Collection<Mutation> mutations = null;
- for (TemporalRow temporalRow : rowSet)
- {
- // If we are building, there is no need to check for partition tombstones; those values will not be present
- // in the partition data
- if (!isBuilding)
- {
- PartitionUpdate partitionTombstone = createRangeTombstoneForRow(temporalRow);
- if (partitionTombstone != null)
- {
- if (mutations == null) mutations = new LinkedList<>();
- mutations.add(new Mutation(partitionTombstone));
- }
- }
-
- PartitionUpdate insert = createUpdatesForInserts(temporalRow);
- if (insert != null)
- {
- if (mutations == null) mutations = new LinkedList<>();
- mutations.add(new Mutation(insert));
- }
- }
-
- if (!isBuilding)
- {
- Collection<Mutation> deletion = createForDeletionInfo(rowSet, partition);
- if (deletion != null && !deletion.isEmpty())
- {
- if (mutations == null) mutations = new LinkedList<>();
- mutations.addAll(deletion);
- }
- }
-
- return mutations;
- }
-
- public synchronized void build()
- {
- if (this.builder != null)
- {
- this.builder.stop();
- this.builder = null;
- }
-
- this.builder = new MaterializedViewBuilder(baseCfs, this);
- CompactionManager.instance.submitMaterializedViewBuilder(builder);
- }
-
- /**
- * @return CFMetaData which represents the definition given
- */
- public static CFMetaData getCFMetaData(MaterializedViewDefinition definition,
- CFMetaData baseCf,
- CFProperties properties)
- {
- CFMetaData.Builder viewBuilder = CFMetaData.Builder
- .createView(baseCf.ksName, definition.viewName);
-
- ColumnDefinition nonPkTarget = null;
-
- for (ColumnIdentifier targetIdentifier : definition.partitionColumns)
- {
- ColumnDefinition target = baseCf.getColumnDefinition(targetIdentifier);
- if (!target.isPartitionKey())
- nonPkTarget = target;
-
- viewBuilder.addPartitionKey(target.name, properties.getReversableType(targetIdentifier, target.type));
- }
-
- Collection<ColumnDefinition> included = new ArrayList<>();
- for(ColumnIdentifier identifier : definition.included)
- {
- ColumnDefinition cfDef = baseCf.getColumnDefinition(identifier);
- assert cfDef != null;
- included.add(cfDef);
- }
-
- boolean includeAll = included.isEmpty();
-
- for (ColumnIdentifier ident : definition.clusteringColumns)
- {
- ColumnDefinition column = baseCf.getColumnDefinition(ident);
- viewBuilder.addClusteringColumn(ident, properties.getReversableType(ident, column.type));
- }
-
- for (ColumnDefinition column : baseCf.partitionColumns().regulars.columns)
- {
- if (column != nonPkTarget && (includeAll || included.contains(column)))
- {
- viewBuilder.addRegularColumn(column.name, column.type);
- }
- }
-
- //Add any extra clustering columns
- for (ColumnDefinition column : Iterables.concat(baseCf.partitionKeyColumns(), baseCf.clusteringColumns()))
- {
- if ( (!definition.partitionColumns.contains(column.name) && !definition.clusteringColumns.contains(column.name)) &&
- (includeAll || included.contains(column)) )
- {
- viewBuilder.addRegularColumn(column.name, column.type);
- }
- }
-
- CFMetaData cfm = viewBuilder.build();
- properties.properties.applyToCFMetadata(cfm);
-
- return cfm;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
deleted file mode 100644
index e8842ed..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.ReadOrderGroup;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.compaction.CompactionInfo;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.partitions.FilteredPartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.Refs;
-
-public class MaterializedViewBuilder extends CompactionInfo.Holder
-{
- private final ColumnFamilyStore baseCfs;
- private final MaterializedView view;
- private final UUID compactionId;
- private volatile Token prevToken = null;
-
- private static final Logger logger = LoggerFactory.getLogger(MaterializedViewBuilder.class);
-
- private volatile boolean isStopped = false;
-
- public MaterializedViewBuilder(ColumnFamilyStore baseCfs, MaterializedView view)
- {
- this.baseCfs = baseCfs;
- this.view = view;
- compactionId = UUIDGen.getTimeUUID();
- }
-
- private void buildKey(DecoratedKey key)
- {
- QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null);
-
- while (!pager.isExhausted())
- {
- try (ReadOrderGroup orderGroup = pager.startOrderGroup();
- PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup))
- {
- if (!partitionIterator.hasNext())
- return;
-
- try (RowIterator rowIterator = partitionIterator.next())
- {
- Collection<Mutation> mutations = view.createMutations(key.getKey(), FilteredPartition.create(rowIterator), true);
-
- if (mutations != null)
- {
- try
- {
- StorageProxy.mutateMV(key.getKey(), mutations);
- break;
- }
- catch (WriteTimeoutException ex)
- {
- NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES)
- .warn("Encountered write timeout when building materialized view {}, the entries were stored in the batchlog and will be replayed at another time", view.name);
- }
- }
- }
- }
- }
- }
-
- public void run()
- {
- String ksname = baseCfs.metadata.ksName, viewName = view.name;
-
- if (SystemKeyspace.isViewBuilt(ksname, viewName))
- return;
-
- Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
- final Pair<Integer, Token> buildStatus = SystemKeyspace.getMaterializedViewBuildStatus(ksname, viewName);
- Token lastToken;
- Function<View, Iterable<SSTableReader>> function;
- if (buildStatus == null)
- {
- baseCfs.forceBlockingFlush();
- function = View.select(SSTableSet.CANONICAL);
- int generation = Integer.MIN_VALUE;
-
- try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs)
- {
- for (SSTableReader reader : temp)
- {
- generation = Math.max(reader.descriptor.generation, generation);
- }
- }
-
- SystemKeyspace.beginMaterializedViewBuild(ksname, viewName, generation);
- lastToken = null;
- }
- else
- {
- function = new Function<View, Iterable<SSTableReader>>()
- {
- @Nullable
- public Iterable<SSTableReader> apply(View view)
- {
- Iterable<SSTableReader> readers = View.select(SSTableSet.CANONICAL).apply(view);
- if (readers != null)
- return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left);
- return null;
- }
- };
- lastToken = buildStatus.right;
- }
-
- prevToken = lastToken;
- try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
- ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
- {
- while (!isStopped && iter.hasNext())
- {
- DecoratedKey key = iter.next();
- Token token = key.getToken();
- if (lastToken == null || lastToken.compareTo(token) < 0)
- {
- for (Range<Token> range : ranges)
- {
- if (range.contains(token))
- {
- buildKey(key);
-
- if (prevToken == null || prevToken.compareTo(token) != 0)
- {
- SystemKeyspace.updateMaterializedViewBuildStatus(ksname, viewName, key.getToken());
- prevToken = token;
- }
- }
- }
- lastToken = null;
- }
- }
-
- SystemKeyspace.finishMaterializedViewBuildStatus(ksname, viewName);
-
- }
- catch (Exception e)
- {
- final MaterializedViewBuilder builder = new MaterializedViewBuilder(baseCfs, view);
- ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitMaterializedViewBuilder(builder),
- 5,
- TimeUnit.MINUTES);
- logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
- }
- }
-
- public CompactionInfo getCompactionInfo()
- {
- long rangesLeft = 0, rangesTotal = 0;
- Token lastToken = prevToken;
-
- // This approximation is not very accurate, but since we do not have a method which allows us to calculate the
- // percentage of a range covered by a second range, this is the best approximation that we can calculate.
- // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of
- // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node
- // has.
- for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName()))
- {
- rangesLeft++;
- rangesTotal++;
- // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the
- // end of the method.
- if (lastToken == null || range.contains(lastToken))
- rangesLeft = 0;
- }
- return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId);
- }
-
- public void stop()
- {
- isStopped = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
deleted file mode 100644
index 7f97728..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.locks.Lock;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Striped;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.MaterializedViewDefinition;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-
-/**
- * Manages {@link MaterializedView}'s for a single {@link ColumnFamilyStore}. All of the materialized views for that
- * table are created when this manager is initialized.
- *
- * The main purposes of the manager are to provide a single location for updates to be vetted to see whether they update
- * any views {@link MaterializedViewManager#updateAffectsView(PartitionUpdate)}, provide locks to prevent multiple
- * updates from creating incoherent updates in the view {@link MaterializedViewManager#acquireLockFor(ByteBuffer)}, and
- * to affect change on the view.
- */
-public class MaterializedViewManager
-{
- private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentWriters() * 1024);
-
- private final ConcurrentNavigableMap<String, MaterializedView> viewsByName;
-
- private final ColumnFamilyStore baseCfs;
-
- public MaterializedViewManager(ColumnFamilyStore baseCfs)
- {
- this.viewsByName = new ConcurrentSkipListMap<>();
-
- this.baseCfs = baseCfs;
- }
-
- public Iterable<MaterializedView> allViews()
- {
- return viewsByName.values();
- }
-
- public Iterable<ColumnFamilyStore> allViewsCfs()
- {
- List<ColumnFamilyStore> viewColumnFamilies = new ArrayList<>();
- for (MaterializedView view : allViews())
- viewColumnFamilies.add(view.getViewCfs());
- return viewColumnFamilies;
- }
-
- public void init()
- {
- reload();
- }
-
- public void invalidate()
- {
- for (MaterializedView view : allViews())
- removeMaterializedView(view.name);
- }
-
- public void reload()
- {
- Map<String, MaterializedViewDefinition> newViewsByName = new HashMap<>();
- for (MaterializedViewDefinition definition : baseCfs.metadata.getMaterializedViews())
- {
- newViewsByName.put(definition.viewName, definition);
- }
-
- for (String viewName : viewsByName.keySet())
- {
- if (!newViewsByName.containsKey(viewName))
- removeMaterializedView(viewName);
- }
-
- for (Map.Entry<String, MaterializedViewDefinition> entry : newViewsByName.entrySet())
- {
- if (!viewsByName.containsKey(entry.getKey()))
- addMaterializedView(entry.getValue());
- }
-
- for (MaterializedView view : allViews())
- {
- view.build();
- // We provide the new definition from the base metadata
- view.updateDefinition(newViewsByName.get(view.name));
- }
- }
-
- public void buildAllViews()
- {
- for (MaterializedView view : allViews())
- view.build();
- }
-
- public void removeMaterializedView(String name)
- {
- MaterializedView view = viewsByName.remove(name);
-
- if (view == null)
- return;
-
- SystemKeyspace.setMaterializedViewRemoved(baseCfs.metadata.ksName, view.name);
- }
-
- public void addMaterializedView(MaterializedViewDefinition definition)
- {
- MaterializedView view = new MaterializedView(definition, baseCfs);
-
- viewsByName.put(definition.viewName, view);
- }
-
- /**
- * Calculates and pushes updates to the views replicas. The replicas are determined by
- * {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
- */
- public void pushViewReplicaUpdates(ByteBuffer key, PartitionUpdate update) throws UnavailableException, OverloadedException, WriteTimeoutException
- {
- // This happens when we are replaying from commitlog. In that case, we have already sent this commit off to the
- // view node.
- if (!StorageService.instance.isJoined()) return;
-
- List<Mutation> mutations = null;
- for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet())
- {
- Collection<Mutation> viewMutations = view.getValue().createMutations(key, update, false);
- if (viewMutations != null && !viewMutations.isEmpty())
- {
- if (mutations == null)
- mutations = Lists.newLinkedList();
- mutations.addAll(viewMutations);
- }
- }
- if (mutations != null)
- {
- StorageProxy.mutateMV(key, mutations);
- }
- }
-
- public boolean updateAffectsView(PartitionUpdate upd)
- {
- for (MaterializedView view : allViews())
- {
- if (view.updateAffectsView(upd))
- return true;
- }
- return false;
- }
-
- public static Lock acquireLockFor(ByteBuffer key)
- {
- Lock lock = LOCKS.get(key);
-
- if (lock.tryLock())
- return lock;
-
- return null;
- }
-
- public static boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean ignoreRf1)
- {
- for (IMutation mutation : mutations)
- {
- for (PartitionUpdate cf : mutation.getPartitionUpdates())
- {
- Keyspace keyspace = Keyspace.open(cf.metadata().ksName);
-
- if (ignoreRf1 && keyspace.getReplicationStrategy().getReplicationFactor() == 1)
- continue;
-
- MaterializedViewManager viewManager = keyspace.getColumnFamilyStore(cf.metadata().cfId).materializedViewManager;
- if (viewManager.updateAffectsView(cf))
- return true;
- }
- }
-
- return false;
- }
-
-
- public void forceBlockingFlush()
- {
- for (ColumnFamilyStore viewCfs : allViewsCfs())
- viewCfs.forceBlockingFlush();
- }
-
- public void dumpMemtables()
- {
- for (ColumnFamilyStore viewCfs : allViewsCfs())
- viewCfs.dumpMemtable();
- }
-
- public void truncateBlocking(long truncatedAt)
- {
- for (ColumnFamilyStore viewCfs : allViewsCfs())
- {
- ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt);
- SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter);
- }
- }
-}