You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/07/29 22:16:16 UTC
[3/5] cassandra git commit: Materialized Views
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java b/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
new file mode 100644
index 0000000..4dfea75
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.utils.FBUtilities;
+
+public final class MaterializedViewUtils
+{
+ private MaterializedViewUtils()
+ {
+ }
+
+ /**
+ * Calculate the natural endpoint for the view.
+ *
+ * The view natural endpoint is the endpint which has the same cardinality as this node in the replication factor.
+ * The cardinality is the number at which this node would store a piece of data, given the change in replication
+ * factor.
+ *
+ * For example, if we have the following ring:
+ * A, T1 -> B, T2 -> C, T3 -> A
+ *
+ * For the token T1, at RF=1, A would be included, so A's cardinality for T1 is 1. For the token T1, at RF=2, B would
+ * be included, so B's cardinality for token T1 is 2. For token T3, at RF = 2, A would be included, so A's cardinality
+ * for T3 is 2.
+ *
+ * For a view whose base token is T1 and whose view token is T3, the pairings between the nodes would be:
+ * A writes to C (A's cardinality is 1 for T1, and C's cardinality is 1 for T3)
+ * B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 for T3)
+ * C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 for T3)
+ *
+ * @throws RuntimeException if this method is called using a base token which does not belong to this replica
+ */
+ public static InetAddress getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
+ {
+ AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
+
+ String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+ List<InetAddress> localBaseEndpoints = new ArrayList<>();
+ List<InetAddress> localViewEndpoints = new ArrayList<>();
+ for (InetAddress baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken))
+ {
+ if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter))
+ localBaseEndpoints.add(baseEndpoint);
+ }
+
+ for (InetAddress viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken))
+ {
+ // If we are a base endpoint which is also a view replica, we use ourselves as our view replica
+ if (viewEndpoint.equals(FBUtilities.getBroadcastAddress()))
+ return viewEndpoint;
+
+ // We have to remove any endpoint which is shared between the base and the view, as it will select itself
+ // and throw off the counts otherwise.
+ if (localBaseEndpoints.contains(viewEndpoint))
+ localBaseEndpoints.remove(viewEndpoint);
+ else if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter))
+ localViewEndpoints.add(viewEndpoint);
+ }
+
+ // The replication strategy will be the same for the base and the view, as they must belong to the same keyspace.
+ // Since the same replication strategy is used, the same placement should be used and we should get the same
+ // number of replicas for all of the tokens in the ring.
+ assert localBaseEndpoints.size() == localViewEndpoints.size() : "Replication strategy should have the same number of endpoints for the base and the view";
+ int baseIdx = localBaseEndpoints.indexOf(FBUtilities.getBroadcastAddress());
+ if (baseIdx < 0)
+ throw new RuntimeException("Trying to get the view natural endpoint on a non-data replica");
+
+ return localViewEndpoints.get(baseIdx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
new file mode 100644
index 0000000..53e4e91
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.view;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.CBuilder;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Conflicts;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Represents a single CQL Row in a base table, with both the currently persisted value and the update's value. The
+ * values are stored in timestamp order, but also indicate whether they are from the currently persisted, allowing a
+ * {@link TemporalRow.Resolver} to resolve if the value is an old value that has been updated; if it sorts after the
+ * update's value, then it does not qualify.
+ */
+public class TemporalRow
+{
+ private static final int NO_TTL = LivenessInfo.NO_TTL;
+ private static final long NO_TIMESTAMP = LivenessInfo.NO_TIMESTAMP;
+ private static final int NO_DELETION_TIME = DeletionTime.LIVE.localDeletionTime();
+
+ public interface Resolver
+ {
+ /**
+ * @param cells Iterable of all cells for a certain TemporalRow's Cell, in timestamp sorted order
+ * @return A single TemporalCell from the iterable which satisfies the resolution criteria, or null if
+ * there is no cell which qualifies
+ */
+ TemporalCell resolve(Iterable<TemporalCell> cells);
+ }
+
+ /**
+ * Returns the first value in the iterable if it is from the set of persisted cells, and the cell which results from
+ * reconciliation of the remaining cells does not have the same value.
+ */
+ public static final Resolver oldValueIfUpdated = cells -> {
+ Iterator<TemporalCell> iterator = cells.iterator();
+ if (!iterator.hasNext())
+ return null;
+
+ TemporalCell initial = iterator.next();
+ if (initial.isNew || !iterator.hasNext())
+ return null;
+
+ TemporalCell value = initial;
+ while (iterator.hasNext())
+ value = value.reconcile(iterator.next());
+
+ return ByteBufferUtil.compareUnsigned(initial.value, value.value) != 0 ? initial : null;
+ };
+
+ public static final Resolver earliest = cells -> {
+ Iterator<TemporalCell> iterator = cells.iterator();
+ if (!iterator.hasNext())
+ return null;
+ return iterator.next();
+ };
+
+ public static final Resolver latest = cells -> {
+ Iterator<TemporalCell> iterator = cells.iterator();
+ if (!iterator.hasNext())
+ return null;
+
+ TemporalCell value = iterator.next();
+ while (iterator.hasNext())
+ value = value.reconcile(iterator.next());
+
+ return value;
+ };
+
+ private static class TemporalCell
+ {
+ public final ByteBuffer value;
+ public final long timestamp;
+ public final int ttl;
+ public final int localDeletionTime;
+ public final boolean isNew;
+
+ private TemporalCell(ByteBuffer value, long timestamp, int ttl, int localDeletionTime, boolean isNew)
+ {
+ this.value = value;
+ this.timestamp = timestamp;
+ this.ttl = ttl;
+ this.localDeletionTime = localDeletionTime;
+ this.isNew = isNew;
+ }
+
+ public TemporalCell reconcile(TemporalCell that)
+ {
+ int now = FBUtilities.nowInSeconds();
+ Conflicts.Resolution resolution = Conflicts.resolveRegular(that.timestamp,
+ that.isLive(now),
+ that.localDeletionTime,
+ that.value,
+ this.timestamp,
+ this.isLive(now),
+ this.localDeletionTime,
+ this.value);
+ assert resolution != Conflicts.Resolution.MERGE;
+ if (resolution == Conflicts.Resolution.LEFT_WINS)
+ return that;
+ return this;
+ }
+
+ private boolean isLive(int now)
+ {
+ return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && now < localDeletionTime);
+ }
+
+ public Cell cell(ColumnDefinition definition, CellPath cellPath)
+ {
+ return new BufferCell(definition, timestamp, ttl, localDeletionTime, value, cellPath);
+ }
+ }
+
+ private final ColumnFamilyStore baseCfs;
+ private final java.util.Set<ColumnIdentifier> viewPrimaryKey;
+ private final ByteBuffer basePartitionKey;
+ public final Map<ColumnIdentifier, ByteBuffer> clusteringColumns;
+ public final int nowInSec;
+ private final Map<ColumnIdentifier, Map<CellPath, SortedMap<Long, TemporalCell>>> columnValues = new HashMap<>();
+ private int viewClusteringTtl = NO_TTL;
+ private long viewClusteringTimestamp = NO_TIMESTAMP;
+ private int viewClusteringLocalDeletionTime = NO_DELETION_TIME;
+
+ TemporalRow(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey, ByteBuffer key, Row row, int nowInSec, boolean isNew)
+ {
+ this.baseCfs = baseCfs;
+ this.viewPrimaryKey = viewPrimaryKey;
+ this.basePartitionKey = key;
+ this.nowInSec = nowInSec;
+ clusteringColumns = new HashMap<>();
+ LivenessInfo liveness = row.primaryKeyLivenessInfo();
+ this.viewClusteringLocalDeletionTime = minValueIfSet(viewClusteringLocalDeletionTime, row.deletion().localDeletionTime(), NO_DELETION_TIME);
+ this.viewClusteringTimestamp = minValueIfSet(viewClusteringTimestamp, liveness.timestamp(), NO_TIMESTAMP);
+ this.viewClusteringTtl = minValueIfSet(viewClusteringTtl, liveness.ttl(), NO_TTL);
+
+ List<ColumnDefinition> clusteringDefs = baseCfs.metadata.clusteringColumns();
+ for (int i = 0; i < clusteringDefs.size(); i++)
+ {
+ ColumnDefinition cdef = clusteringDefs.get(i);
+ clusteringColumns.put(cdef.name, row.clustering().get(i));
+
+ addColumnValue(cdef.name, null, NO_TIMESTAMP, NO_TTL, NO_DELETION_TIME, row.clustering().get(i), isNew);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TemporalRow that = (TemporalRow) o;
+
+ if (!clusteringColumns.equals(that.clusteringColumns)) return false;
+ if (!basePartitionKey.equals(that.basePartitionKey)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = basePartitionKey.hashCode();
+ result = 31 * result + clusteringColumns.hashCode();
+ return result;
+ }
+
+ public void addColumnValue(ColumnIdentifier identifier,
+ CellPath cellPath,
+ long timestamp,
+ int ttl,
+ int localDeletionTime,
+ ByteBuffer value, boolean isNew)
+ {
+ if (!columnValues.containsKey(identifier))
+ columnValues.put(identifier, new HashMap<>());
+
+ Map<CellPath, SortedMap<Long, TemporalCell>> innerMap = columnValues.get(identifier);
+
+ if (!innerMap.containsKey(cellPath))
+ innerMap.put(cellPath, new TreeMap<>());
+
+ // If this column is part of the view's primary keys
+ if (viewPrimaryKey.contains(identifier))
+ {
+ this.viewClusteringTtl = minValueIfSet(this.viewClusteringTtl, ttl, NO_TTL);
+ this.viewClusteringTimestamp = minValueIfSet(this.viewClusteringTimestamp, timestamp, NO_TIMESTAMP);
+ this.viewClusteringLocalDeletionTime = minValueIfSet(this.viewClusteringLocalDeletionTime, localDeletionTime, NO_DELETION_TIME);
+ }
+
+ innerMap.get(cellPath).put(timestamp, new TemporalCell(value, timestamp, ttl, localDeletionTime, isNew));
+ }
+
+ private static int minValueIfSet(int existing, int update, int defaultValue)
+ {
+ if (existing == defaultValue)
+ return update;
+ if (update == defaultValue)
+ return existing;
+ return Math.min(existing, update);
+ }
+
+ private static long minValueIfSet(long existing, long update, long defaultValue)
+ {
+ if (existing == defaultValue)
+ return update;
+ if (update == defaultValue)
+ return existing;
+ return Math.min(existing, update);
+ }
+
+ public int viewClusteringTtl()
+ {
+ return viewClusteringTtl;
+ }
+
+ public long viewClusteringTimestamp()
+ {
+ return viewClusteringTimestamp;
+ }
+
+ public int viewClusteringLocalDeletionTime()
+ {
+ return viewClusteringLocalDeletionTime;
+ }
+
+ public void addCell(Cell cell, boolean isNew)
+ {
+ addColumnValue(cell.column().name, cell.path(), cell.timestamp(), cell.ttl(), cell.localDeletionTime(), cell.value(), isNew);
+ }
+
+ // The Definition here is actually the *base table* definition
+ public ByteBuffer clusteringValue(ColumnDefinition definition, Resolver resolver)
+ {
+ ColumnDefinition baseDefinition = definition.cfName.equals(baseCfs.name)
+ ? definition
+ : baseCfs.metadata.getColumnDefinition(definition.name);
+
+ if (baseDefinition.isPartitionKey())
+ {
+ if (baseDefinition.isOnAllComponents())
+ return basePartitionKey;
+ else
+ {
+ CompositeType keyComparator = (CompositeType) baseCfs.metadata.getKeyValidator();
+ ByteBuffer[] components = keyComparator.split(basePartitionKey);
+ return components[baseDefinition.position()];
+ }
+ }
+ else
+ {
+ ColumnIdentifier columnIdentifier = baseDefinition.name;
+
+ if (clusteringColumns.containsKey(columnIdentifier))
+ return clusteringColumns.get(columnIdentifier);
+
+ Collection<org.apache.cassandra.db.rows.Cell> val = values(definition, resolver);
+ if (val != null && val.size() == 1)
+ return Iterables.getOnlyElement(val).value();
+ }
+ return null;
+ }
+
+ public DeletionTime deletionTime(AbstractThreadUnsafePartition partition)
+ {
+ DeletionInfo deletionInfo = partition.deletionInfo();
+ if (!deletionInfo.getPartitionDeletion().isLive())
+ return deletionInfo.getPartitionDeletion();
+
+ Clustering baseClustering = baseClusteringBuilder().build();
+ RangeTombstone clusterTombstone = deletionInfo.rangeCovering(baseClustering);
+ if (clusterTombstone != null)
+ return clusterTombstone.deletionTime();
+
+ Row row = partition.getRow(baseClustering);
+ return row == null || row.deletion().isLive() ? DeletionTime.LIVE : row.deletion();
+ }
+
+ public Collection<org.apache.cassandra.db.rows.Cell> values(ColumnDefinition definition, Resolver resolver)
+ {
+ Map<CellPath, SortedMap<Long, TemporalCell>> innerMap = columnValues.get(definition.name);
+ if (innerMap == null)
+ {
+ return Collections.emptyList();
+ }
+
+ Collection<org.apache.cassandra.db.rows.Cell> value = new ArrayList<>();
+ for (Map.Entry<CellPath, SortedMap<Long, TemporalCell>> pathAndCells : innerMap.entrySet())
+ {
+ TemporalCell cell = resolver.resolve(pathAndCells.getValue().values());
+
+ if (cell != null)
+ value.add(cell.cell(definition, pathAndCells.getKey()));
+ }
+ return value;
+ }
+
+ public Slice baseSlice()
+ {
+ return baseClusteringBuilder().buildSlice();
+ }
+
+ private CBuilder baseClusteringBuilder()
+ {
+ CFMetaData metadata = baseCfs.metadata;
+ CBuilder builder = CBuilder.create(metadata.comparator);
+
+ ByteBuffer[] buffers = new ByteBuffer[clusteringColumns.size()];
+ for (Map.Entry<ColumnIdentifier, ByteBuffer> buffer : clusteringColumns.entrySet())
+ buffers[metadata.getColumnDefinition(buffer.getKey()).position()] = buffer.getValue();
+
+ for (ByteBuffer byteBuffer : buffers)
+ builder = builder.add(byteBuffer);
+
+ return builder;
+ }
+
+ static class Set implements Iterable<TemporalRow>
+ {
+ private final ColumnFamilyStore baseCfs;
+ private final java.util.Set<ColumnIdentifier> viewPrimaryKey;
+ private final ByteBuffer key;
+ public final DecoratedKey dk;
+ private final Map<Clustering, TemporalRow> clusteringToRow;
+ final int nowInSec = FBUtilities.nowInSeconds();
+
+ Set(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey, ByteBuffer key)
+ {
+ this.baseCfs = baseCfs;
+ this.viewPrimaryKey = viewPrimaryKey;
+ this.key = key;
+ this.dk = baseCfs.partitioner.decorateKey(key);
+ this.clusteringToRow = new HashMap<>();
+ }
+
+ public Iterator<TemporalRow> iterator()
+ {
+ return clusteringToRow.values().iterator();
+ }
+
+ public TemporalRow getClustering(Clustering clustering)
+ {
+ return clusteringToRow.get(clustering);
+ }
+
+ public void addRow(Row row, boolean isNew)
+ {
+ TemporalRow temporalRow = clusteringToRow.get(row.clustering());
+ if (temporalRow == null)
+ {
+ temporalRow = new TemporalRow(baseCfs, viewPrimaryKey, key, row, nowInSec, isNew);
+ clusteringToRow.put(row.clustering(), temporalRow);
+ }
+
+ for (Cell cell: row.cells())
+ {
+ temporalRow.addCell(cell, isNew);
+ }
+ }
+
+ public int size()
+ {
+ return clusteringToRow.size();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 8f52bb1..9c886b0 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -87,7 +87,7 @@ public class KeyspaceMetrics
public final LatencyMetrics casPropose;
/** CAS Commit metrics */
public final LatencyMetrics casCommit;
-
+
public final MetricNameFactory factory;
private Keyspace keyspace;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java b/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
new file mode 100644
index 0000000..39a5574
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Counter;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class MVWriteMetrics extends ClientRequestMetrics
+{
+ public final Counter viewReplicasAttempted;
+ public final Counter viewReplicasSuccess;
+
+ public MVWriteMetrics(String scope) {
+ super(scope);
+ viewReplicasAttempted = Metrics.counter(factory.createMetricName("ViewReplicasAttempted"));
+ viewReplicasSuccess = Metrics.counter(factory.createMetricName("ViewReplicasSuccess"));
+ }
+
+ public void release()
+ {
+ super.release();
+ Metrics.remove(factory.createMetricName("ViewReplicasAttempted"));
+ Metrics.remove(factory.createMetricName("ViewReplicasSuccess"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index a5f3601..4f15da2 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -134,6 +134,8 @@ public final class MessagingService implements MessagingServiceMBean
PAXOS_PROPOSE,
PAXOS_COMMIT,
@Deprecated PAGED_RANGE,
+ BATCHLOG_MUTATION,
+ MATERIALIZED_VIEW_MUTATION,
// remember to add new verbs at the end, since we serialize by ordinal
UNUSED_1,
UNUSED_2,
@@ -145,6 +147,8 @@ public final class MessagingService implements MessagingServiceMBean
{{
put(Verb.MUTATION, Stage.MUTATION);
put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION);
+ put(Verb.MATERIALIZED_VIEW_MUTATION, Stage.MATERIALIZED_VIEW_MUTATION);
+ put(Verb.BATCHLOG_MUTATION, Stage.BATCHLOG_MUTATION);
put(Verb.READ_REPAIR, Stage.MUTATION);
put(Verb.TRUNCATE, Stage.MUTATION);
put(Verb.PAXOS_PREPARE, Stage.MUTATION);
@@ -203,6 +207,8 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance);
put(Verb.MUTATION, Mutation.serializer);
+ put(Verb.BATCHLOG_MUTATION, Mutation.serializer);
+ put(Verb.MATERIALIZED_VIEW_MUTATION, Mutation.serializer);
put(Verb.READ_REPAIR, Mutation.serializer);
put(Verb.READ, ReadCommand.serializer);
//put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer);
@@ -229,6 +235,8 @@ public final class MessagingService implements MessagingServiceMBean
public static final EnumMap<Verb, IVersionedSerializer<?>> callbackDeserializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class)
{{
put(Verb.MUTATION, WriteResponse.serializer);
+ put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer);
+ put(Verb.MATERIALIZED_VIEW_MUTATION, WriteResponse.serializer);
put(Verb.READ_REPAIR, WriteResponse.serializer);
put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
put(Verb.RANGE_SLICE, ReadResponse.legacyRangeSliceReplySerializer);
@@ -291,6 +299,8 @@ public final class MessagingService implements MessagingServiceMBean
*/
public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb._TRACE,
Verb.MUTATION,
+ Verb.BATCHLOG_MUTATION, //FIXME: should this be droppable??
+ Verb.MATERIALIZED_VIEW_MUTATION,
Verb.COUNTER_MUTATION,
Verb.READ_REPAIR,
Verb.READ,
@@ -618,7 +628,10 @@ public final class MessagingService implements MessagingServiceMBean
ConsistencyLevel consistencyLevel,
boolean allowHints)
{
- assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
+ assert message.verb == Verb.MUTATION
+ || message.verb == Verb.BATCHLOG_MUTATION
+ || message.verb == Verb.MATERIALIZED_VIEW_MUTATION
+ || message.verb == Verb.COUNTER_MUTATION;
int messageId = nextId();
CallbackInfo previous = callbacks.put(messageId,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 7326fa9..1c21e41 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -307,7 +307,9 @@ public final class LegacySchemaMigrator
defaultValidator);
}
- CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, columnDefs);
+ // The legacy schema did not have views, so we know that we are not loading a materialized view
+ boolean isMaterializedView = false;
+ CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, isMaterializedView, columnDefs);
cfm.readRepairChance(tableRow.getDouble("read_repair_chance"));
cfm.dcLocalReadRepairChance(tableRow.getDouble("local_read_repair_chance"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/schema/MaterializedViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MaterializedViews.java b/src/java/org/apache/cassandra/schema/MaterializedViews.java
new file mode 100644
index 0000000..1c55736
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/MaterializedViews.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+
+import java.util.Iterator;
+import java.util.Optional;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.config.MaterializedViewDefinition;
+
+import static com.google.common.collect.Iterables.filter;
+
+public final class MaterializedViews implements Iterable<MaterializedViewDefinition>
+{
+ private final ImmutableMap<String, MaterializedViewDefinition> materializedViews;
+
+ private MaterializedViews(Builder builder)
+ {
+ materializedViews = builder.materializedViews.build();
+ }
+
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ public static MaterializedViews none()
+ {
+ return builder().build();
+ }
+
+ public Iterator<MaterializedViewDefinition> iterator()
+ {
+ return materializedViews.values().iterator();
+ }
+
+ public int size()
+ {
+ return materializedViews.size();
+ }
+
+ public boolean isEmpty()
+ {
+ return materializedViews.isEmpty();
+ }
+
+ /**
+ * Get the materialized view with the specified name
+ *
+ * @param name a non-qualified materialized view name
+ * @return an empty {@link Optional} if the materialized view name is not found; a non-empty optional of {@link MaterializedViewDefinition} otherwise
+ */
+ public Optional<MaterializedViewDefinition> get(String name)
+ {
+ return Optional.ofNullable(materializedViews.get(name));
+ }
+
+ /**
+ * Create a MaterializedViews instance with the provided materialized view added
+ */
+ public MaterializedViews with(MaterializedViewDefinition materializedView)
+ {
+ if (get(materializedView.viewName).isPresent())
+ throw new IllegalStateException(String.format("Materialized View %s already exists", materializedView.viewName));
+
+ return builder().add(this).add(materializedView).build();
+ }
+
+ /**
+ * Creates a MaterializedViews instance with the materializedView with the provided name removed
+ */
+ public MaterializedViews without(String name)
+ {
+ MaterializedViewDefinition materializedView =
+ get(name).orElseThrow(() -> new IllegalStateException(String.format("Materialized View %s doesn't exists", name)));
+
+ return builder().add(filter(this, v -> v != materializedView)).build();
+ }
+
+ /**
+ * Creates a MaterializedViews instance which contains an updated materialized view
+ */
+ public MaterializedViews replace(MaterializedViewDefinition materializedView)
+ {
+ return without(materializedView.viewName).with(materializedView);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return this == o || (o instanceof MaterializedViews && materializedViews.equals(((MaterializedViews) o).materializedViews));
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return materializedViews.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return materializedViews.values().toString();
+ }
+
+ public static final class Builder
+ {
+ final ImmutableMap.Builder<String, MaterializedViewDefinition> materializedViews = new ImmutableMap.Builder<>();
+
+ private Builder()
+ {
+ }
+
+ public MaterializedViews build()
+ {
+ return new MaterializedViews(this);
+ }
+
+ public Builder add(MaterializedViewDefinition materializedView)
+ {
+ materializedViews.put(materializedView.viewName, materializedView);
+ return this;
+ }
+
+ public Builder add(Iterable<MaterializedViewDefinition> materializedViews)
+ {
+ materializedViews.forEach(this::add);
+ return this;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 2bc7b0c..2150f4a 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -76,12 +76,13 @@ public final class SchemaKeyspace
public static final String COLUMNS = "columns";
public static final String DROPPED_COLUMNS = "dropped_columns";
public static final String TRIGGERS = "triggers";
+ public static final String MATERIALIZED_VIEWS = "materialized_views";
public static final String TYPES = "types";
public static final String FUNCTIONS = "functions";
public static final String AGGREGATES = "aggregates";
public static final List<String> ALL =
- ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, TYPES, FUNCTIONS, AGGREGATES);
+ ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, MATERIALIZED_VIEWS, TYPES, FUNCTIONS, AGGREGATES);
private static final CFMetaData Keyspaces =
compile(KEYSPACES,
@@ -152,6 +153,18 @@ public final class SchemaKeyspace
+ "trigger_options map<text, text>,"
+ "PRIMARY KEY ((keyspace_name), table_name, trigger_name))");
+ private static final CFMetaData MaterializedViews =
+ compile(MATERIALIZED_VIEWS,
+ "materialized views definitions",
+ "CREATE TABLE %s ("
+ + "keyspace_name text,"
+ + "table_name text,"
+ + "view_name text,"
+ + "target_columns list<text>,"
+ + "clustering_columns list<text>,"
+ + "included_columns list<text>,"
+ + "PRIMARY KEY ((keyspace_name), table_name, view_name))");
+
private static final CFMetaData Types =
compile(TYPES,
"user defined type definitions",
@@ -193,7 +206,7 @@ public final class SchemaKeyspace
+ "PRIMARY KEY ((keyspace_name), aggregate_name, signature))");
public static final List<CFMetaData> All =
- ImmutableList.of(Keyspaces, Tables, Columns, DroppedColumns, Triggers, Types, Functions, Aggregates);
+ ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, MaterializedViews, Types, Functions, Aggregates);
private static CFMetaData compile(String name, String description, String schema)
{
@@ -688,6 +701,8 @@ public final class SchemaKeyspace
Mutation mutation = new Mutation(NAME, getSchemaKSDecoratedKey(keyspace.name));
for (CFMetaData schemaTable : All)
mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, mutation.key(), timestamp, nowInSec));
+ mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.BuiltMaterializedViews, mutation.key(), timestamp, nowInSec));
+ mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.MaterializedViewsBuildsInProgress, mutation.key(), timestamp, nowInSec));
return mutation;
}
@@ -830,6 +845,9 @@ public final class SchemaKeyspace
for (TriggerMetadata trigger : table.getTriggers())
addTriggerToSchemaMutation(table, trigger, timestamp, mutation);
+
+ for (MaterializedViewDefinition materializedView: table.getMaterializedViews())
+ addMaterializedViewToSchemaMutation(table, materializedView, timestamp, mutation);
}
}
@@ -923,6 +941,22 @@ public final class SchemaKeyspace
for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnRight().values())
addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation);
+ MapDifference<String, MaterializedViewDefinition> materializedViewDiff = materializedViewsDiff(oldTable.getMaterializedViews(), newTable.getMaterializedViews());
+
+ // dropped materialized views
+ for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnLeft().values())
+ dropMaterializedViewFromSchemaMutation(oldTable, materializedView, timestamp, mutation);
+
+ // newly created materialized views
+ for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnRight().values())
+ addMaterializedViewToSchemaMutation(oldTable, materializedView, timestamp, mutation);
+
+ // updated materialized views need to be updated
+ for (MapDifference.ValueDifference<MaterializedViewDefinition> diff : materializedViewDiff.entriesDiffering().values())
+ {
+ addUpdatedMaterializedViewDefinitionToSchemaMutation(oldTable, diff.rightValue(), timestamp, mutation);
+ }
+
return mutation;
}
@@ -937,6 +971,17 @@ public final class SchemaKeyspace
return Maps.difference(beforeMap, afterMap);
}
+ private static MapDifference<String, MaterializedViewDefinition> materializedViewsDiff(MaterializedViews before, MaterializedViews after)
+ {
+ Map<String, MaterializedViewDefinition> beforeMap = new HashMap<>();
+ before.forEach(v -> beforeMap.put(v.viewName, v));
+
+ Map<String, MaterializedViewDefinition> afterMap = new HashMap<>();
+ after.forEach(v -> afterMap.put(v.viewName, v));
+
+ return Maps.difference(beforeMap, afterMap);
+ }
+
public static Mutation makeDropTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
@@ -950,6 +995,9 @@ public final class SchemaKeyspace
for (TriggerMetadata trigger : table.getTriggers())
dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation);
+ for (MaterializedViewDefinition materializedView : table.getMaterializedViews())
+ dropMaterializedViewFromSchemaMutation(table, materializedView, timestamp, mutation);
+
return mutation;
}
@@ -1014,8 +1062,12 @@ public final class SchemaKeyspace
Triggers triggers =
readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, SchemaKeyspace::createTriggersFromTriggersPartition);
+ MaterializedViews views =
+ readSchemaPartitionForTableAndApply(MATERIALIZED_VIEWS, keyspace, table, SchemaKeyspace::createMaterializedViewsFromMaterializedViewsPartition);
+
return createTableFromTableRowAndColumns(row, columns).droppedColumns(droppedColumns)
- .triggers(triggers);
+ .triggers(triggers)
+ .materializedViews(views);
}
public static CFMetaData createTableFromTableRowAndColumns(UntypedResultSet.Row row, List<ColumnDefinition> columns)
@@ -1032,8 +1084,9 @@ public final class SchemaKeyspace
boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
+ boolean isMaterializedView = flags.contains(CFMetaData.Flag.MATERIALIZEDVIEW);
- CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, isCompound, isSuper, isCounter, columns);
+ CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, isCompound, isSuper, isCounter, isMaterializedView, columns);
Map<String, String> compaction = new HashMap<>(row.getTextMap("compaction"));
Class<? extends AbstractCompactionStrategy> compactionStrategyClass =
@@ -1221,6 +1274,103 @@ public final class SchemaKeyspace
}
/*
+ * Global Index metadata serialization/deserialization.
+ */
+
+ private static void addMaterializedViewToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
+ {
+ RowUpdateBuilder builder = new RowUpdateBuilder(MaterializedViews, timestamp, mutation)
+ .clustering(table.cfName, materializedView.viewName);
+
+ for (ColumnIdentifier partitionColumn : materializedView.partitionColumns)
+ builder.addListEntry("target_columns", partitionColumn.toString());
+ for (ColumnIdentifier clusteringColumn : materializedView.clusteringColumns)
+ builder = builder.addListEntry("clustering_columns", clusteringColumn.toString());
+ for (ColumnIdentifier includedColumn : materializedView.included)
+ builder = builder.addListEntry("included_columns", includedColumn.toString());
+
+ builder.build();
+ }
+
+ private static void dropMaterializedViewFromSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
+ {
+ RowUpdateBuilder.deleteRow(MaterializedViews, timestamp, mutation, table.cfName, materializedView.viewName);
+ }
+
+ private static void addUpdatedMaterializedViewDefinitionToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
+ {
+ RowUpdateBuilder builder = new RowUpdateBuilder(MaterializedViews, timestamp, mutation)
+ .clustering(table.cfName, materializedView.viewName);
+
+ builder.resetCollection("target_columns");
+ for (ColumnIdentifier partitionColumn : materializedView.partitionColumns)
+ builder.addListEntry("target_columns", partitionColumn.toString());
+
+ builder.resetCollection("clustering_columns");
+ for (ColumnIdentifier clusteringColumn : materializedView.clusteringColumns)
+ builder = builder.addListEntry("clustering_columns", clusteringColumn.toString());
+
+ builder.resetCollection("included_columns");
+ for (ColumnIdentifier includedColumn : materializedView.included)
+ builder = builder.addListEntry("included_columns", includedColumn.toString());
+
+ builder.build();
+ }
+
+ /**
+ * Deserialize materialized views from storage-level representation.
+ *
+ * @param partition storage-level partition containing the materialized view definitions
+ * @return the list of processed MaterializedViewDefinitions
+ */
+ private static MaterializedViews createMaterializedViewsFromMaterializedViewsPartition(RowIterator partition)
+ {
+ MaterializedViews.Builder views = org.apache.cassandra.schema.MaterializedViews.builder();
+ String query = String.format("SELECT * FROM %s.%s", NAME, MATERIALIZED_VIEWS);
+ for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
+ {
+ MaterializedViewDefinition mv = createMaterializedViewFromMaterializedViewRow(row);
+ views.add(mv);
+ }
+ return views.build();
+ }
+
+ private static MaterializedViewDefinition createMaterializedViewFromMaterializedViewRow(UntypedResultSet.Row row)
+ {
+ String name = row.getString("view_name");
+ List<String> partitionColumnNames = row.getList("target_columns", UTF8Type.instance);
+
+ String cfName = row.getString("columnfamily_name");
+ List<String> clusteringColumnNames = row.getList("clustering_columns", UTF8Type.instance);
+
+ List<ColumnIdentifier> partitionColumns = new ArrayList<>();
+ for (String columnName : partitionColumnNames)
+ {
+ partitionColumns.add(ColumnIdentifier.getInterned(columnName, true));
+ }
+
+ List<ColumnIdentifier> clusteringColumns = new ArrayList<>();
+ for (String columnName : clusteringColumnNames)
+ {
+ clusteringColumns.add(ColumnIdentifier.getInterned(columnName, true));
+ }
+
+ List<String> includedColumnNames = row.getList("included_columns", UTF8Type.instance);
+ Set<ColumnIdentifier> includedColumns = new HashSet<>();
+ if (includedColumnNames != null)
+ {
+ for (String columnName : includedColumnNames)
+ includedColumns.add(ColumnIdentifier.getInterned(columnName, true));
+ }
+
+ return new MaterializedViewDefinition(cfName,
+ name,
+ partitionColumns,
+ clusteringColumns,
+ includedColumns);
+ }
+
+ /*
* UDF metadata serialization/deserialization.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 6696e10..9a57f45 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -196,7 +196,7 @@ public abstract class AbstractReadExecutor
return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas);
}
- private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
+ public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
{
public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 8978034..e3ba66e 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -47,7 +47,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
public final ConsistencyLevel consistencyLevel;
protected final Runnable callback;
protected final Collection<InetAddress> pendingEndpoints;
- private final WriteType writeType;
+ protected final WriteType writeType;
private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
= AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures");
private volatile int failures = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
new file mode 100644
index 0000000..ac44923
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.net.MessageIn;
+
+public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
+{
+ AbstractWriteResponseHandler<T> wrapped;
+ BatchlogCleanup cleanup;
+ protected volatile int requiredBeforeFinish;
+ private static final AtomicIntegerFieldUpdater<BatchlogResponseHandler> requiredBeforeFinishUpdater
+ = AtomicIntegerFieldUpdater.newUpdater(BatchlogResponseHandler.class, "requiredBeforeFinish");
+
+ public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup)
+ {
+ super(wrapped.keyspace, wrapped.naturalEndpoints, wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, wrapped.writeType);
+ this.wrapped = wrapped;
+ this.requiredBeforeFinish = requiredBeforeFinish;
+ this.cleanup = cleanup;
+ }
+
+ protected int ackCount()
+ {
+ return wrapped.ackCount();
+ }
+
+ public void response(MessageIn<T> msg)
+ {
+ wrapped.response(msg);
+ if (requiredBeforeFinishUpdater.decrementAndGet(this) == 0)
+ cleanup.run();
+ }
+
+ public boolean isLatencyForSnitch()
+ {
+ return wrapped.isLatencyForSnitch();
+ }
+
+ public void onFailure(InetAddress from)
+ {
+ wrapped.onFailure(from);
+ }
+
+ public void assureSufficientLiveNodes()
+ {
+ wrapped.assureSufficientLiveNodes();
+ }
+
+ public void get() throws WriteTimeoutException, WriteFailureException
+ {
+ wrapped.get();
+ }
+
+ protected int totalBlockFor()
+ {
+ return wrapped.totalBlockFor();
+ }
+
+ protected int totalEndpoints()
+ {
+ return wrapped.totalEndpoints();
+ }
+
+ protected boolean waitingFor(InetAddress from)
+ {
+ return wrapped.waitingFor(from);
+ }
+
+ protected void signal()
+ {
+ wrapped.signal();
+ }
+
+ public static class BatchlogCleanup
+ {
+ private final BatchlogCleanupCallback callback;
+
+ protected volatile int mutationsWaitingFor;
+ private static final AtomicIntegerFieldUpdater<BatchlogCleanup> mutationsWaitingForUpdater
+ = AtomicIntegerFieldUpdater.newUpdater(BatchlogCleanup.class, "mutationsWaitingFor");
+
+ public BatchlogCleanup(int mutationsWaitingFor, BatchlogCleanupCallback callback)
+ {
+ this.mutationsWaitingFor = mutationsWaitingFor;
+ this.callback = callback;
+ }
+
+ public void run()
+ {
+ if (mutationsWaitingForUpdater.decrementAndGet(this) == 0)
+ callback.invoke();
+ }
+ }
+
+ public interface BatchlogCleanupCallback
+ {
+ void invoke();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 548cbc7..fddf593 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -287,6 +287,24 @@ public class CassandraDaemon
}
}
+ Runnable indexRebuild = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ for (Keyspace keyspace : Keyspace.all())
+ {
+ for (ColumnFamilyStore cf: keyspace.getColumnFamilyStores())
+ {
+ cf.materializedViewManager.buildAllViews();
+ }
+ }
+ }
+ };
+
+ ScheduledExecutors.optionalTasks.schedule(indexRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
+
+
SystemKeyspace.finishStartup();
// start server internals
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 2c5a2ab..be11c77 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -31,6 +31,10 @@ import com.google.common.base.Predicate;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.*;
import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.apache.cassandra.db.view.MaterializedViewManager;
+import org.apache.cassandra.db.view.MaterializedViewUtils;
+import org.apache.cassandra.metrics.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +62,6 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.metrics.*;
import org.apache.cassandra.net.*;
import org.apache.cassandra.service.paxos.*;
import org.apache.cassandra.tracing.Tracing;
@@ -91,6 +94,7 @@ public class StorageProxy implements StorageProxyMBean
private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write");
private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite");
private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead");
+ private static final MVWriteMetrics mvWriteMetrics = new MVWriteMetrics("MVWrite");
private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
@@ -118,7 +122,7 @@ public class StorageProxy implements StorageProxyMBean
throws OverloadedException
{
assert mutation instanceof Mutation;
- sendToHintedEndpoints((Mutation) mutation, targets, responseHandler, localDataCenter);
+ sendToHintedEndpoints((Mutation) mutation, targets, responseHandler, localDataCenter, Stage.MUTATION);
}
};
@@ -622,6 +626,80 @@ public class StorageProxy implements StorageProxyMBean
Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
}
+ /**
+ * Use this method to have these Mutations applied
+ * across all replicas.
+ *
+ * @param mutations the mutations to be applied across the replicas
+ */
+ public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations)
+ throws UnavailableException, OverloadedException, WriteTimeoutException
+ {
+ Tracing.trace("Determining replicas for mutation");
+ final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+
+ long startTime = System.nanoTime();
+ List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size());
+
+ try
+ {
+ Token baseToken = StorageService.getPartitioner().getToken(dataKey);
+
+ ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
+
+ //Since the base -> view replication is 1:1 we only need to store the BL locally
+ final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress());
+ final UUID batchUUID = UUIDGen.getTimeUUID();
+ BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
+ () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+
+ // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
+ for (Mutation mutation : mutations)
+ {
+ String keyspaceName = mutation.getKeyspaceName();
+ Token tk = mutation.key().getToken();
+ List<InetAddress> naturalEndpoints = Lists.newArrayList(MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk));
+
+ WriteResponseHandlerWrapper wrapper = wrapMVBatchResponseHandler(mutation,
+ consistencyLevel,
+ consistencyLevel,
+ naturalEndpoints,
+ WriteType.BATCH,
+ cleanup);
+
+ wrappers.add(wrapper);
+
+ //Apply to local batchlog memtable in this thread
+ BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply();
+ }
+
+ // now actually perform the writes and wait for them to complete
+ asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
+ }
+ catch (WriteTimeoutException ex)
+ {
+ mvWriteMetrics.timeouts.mark();
+ Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
+ throw ex;
+ }
+ catch (UnavailableException e)
+ {
+ mvWriteMetrics.unavailables.mark();
+ Tracing.trace("Unavailable");
+ throw e;
+ }
+ catch (OverloadedException e)
+ {
+ mvWriteMetrics.unavailables.mark();
+ Tracing.trace("Overloaded");
+ throw e;
+ }
+ finally
+ {
+ mvWriteMetrics.addNano(System.nanoTime() - startTime);
+ }
+ }
+
@SuppressWarnings("unchecked")
public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
ConsistencyLevel consistencyLevel,
@@ -630,12 +708,17 @@ public class StorageProxy implements StorageProxyMBean
{
Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations);
+ boolean updatesView = MaterializedViewManager.updatesAffectView(mutations, true);
+
if (augmented != null)
- mutateAtomically(augmented, consistencyLevel);
- else if (mutateAtomically)
- mutateAtomically((Collection<Mutation>) mutations, consistencyLevel);
+ mutateAtomically(augmented, consistencyLevel, updatesView);
else
- mutate(mutations, consistencyLevel);
+ {
+ if (mutateAtomically || updatesView)
+ mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView);
+ else
+ mutate(mutations, consistencyLevel);
+ }
}
/**
@@ -646,8 +729,11 @@ public class StorageProxy implements StorageProxyMBean
*
* @param mutations the Mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
+ * @param requireQuorumForRemove at least a quorum of nodes will see update before deleting batchlog
*/
- public static void mutateAtomically(Collection<Mutation> mutations, ConsistencyLevel consistency_level)
+ public static void mutateAtomically(Collection<Mutation> mutations,
+ ConsistencyLevel consistency_level,
+ boolean requireQuorumForRemove)
throws UnavailableException, OverloadedException, WriteTimeoutException
{
Tracing.trace("Determining replicas for atomic batch");
@@ -658,25 +744,49 @@ public class StorageProxy implements StorageProxyMBean
try
{
+
+ // If we are requiring quorum nodes for removal, we upgrade consistency level to QUORUM unless we already
+ // require ALL, or EACH_QUORUM. This is so that *at least* QUORUM nodes see the update.
+ ConsistencyLevel batchConsistencyLevel = requireQuorumForRemove
+ ? ConsistencyLevel.QUORUM
+ : consistency_level;
+
+ switch (consistency_level)
+ {
+ case ALL:
+ case EACH_QUORUM:
+ batchConsistencyLevel = consistency_level;
+ }
+
+ final Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
+ final UUID batchUUID = UUIDGen.getTimeUUID();
+ BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
+ new BatchlogResponseHandler.BatchlogCleanupCallback()
+ {
+ public void invoke()
+ {
+ asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID);
+ }
+ });
+
// add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
for (Mutation mutation : mutations)
{
- WriteResponseHandlerWrapper wrapper = wrapResponseHandler(mutation, consistency_level, WriteType.BATCH);
+ WriteResponseHandlerWrapper wrapper = wrapBatchResponseHandler(mutation,
+ consistency_level,
+ batchConsistencyLevel,
+ WriteType.BATCH,
+ cleanup);
// exit early if we can't fulfill the CL at this time.
wrapper.handler.assureSufficientLiveNodes();
wrappers.add(wrapper);
}
// write to the batchlog
- Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, consistency_level);
- UUID batchUUID = UUIDGen.getTimeUUID();
syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID);
// now actually perform the writes and wait for them to complete
- syncWriteBatchedMutations(wrappers, localDataCenter);
-
- // remove the batchlog entries asynchronously
- asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID);
+ syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION);
}
catch (UnavailableException e)
{
@@ -719,13 +829,13 @@ public class StorageProxy implements StorageProxyMBean
WriteType.BATCH_LOG);
MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version)
- .createMessage();
+ .createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
for (InetAddress target : endpoints)
{
int targetVersion = MessagingService.instance().getVersion(target);
if (canDoLocalRequest(target))
{
- insertLocal(message.payload, handler);
+ insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler);
}
else if (targetVersion == MessagingService.current_version)
{
@@ -734,7 +844,7 @@ public class StorageProxy implements StorageProxyMBean
else
{
MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion)
- .createMessage(),
+ .createMessage(MessagingService.Verb.BATCHLOG_MUTATION),
target,
handler,
false);
@@ -754,25 +864,43 @@ public class StorageProxy implements StorageProxyMBean
WriteType.SIMPLE);
Mutation mutation = new Mutation(SystemKeyspace.NAME, StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(uuid)));
mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
- MessageOut<Mutation> message = mutation.createMessage();
+ MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
for (InetAddress target : endpoints)
{
if (canDoLocalRequest(target))
- insertLocal(message.payload, handler);
+ insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler);
else
MessagingService.instance().sendRR(message, target, handler, false);
}
}
- private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter)
+ private static void asyncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage)
+ {
+ for (WriteResponseHandlerWrapper wrapper : wrappers)
+ {
+ Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints);
+
+ try
+ {
+ sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage);
+ }
+ catch (OverloadedException | WriteTimeoutException e)
+ {
+ wrapper.handler.onFailure(FBUtilities.getBroadcastAddress());
+ }
+ }
+ }
+
+ private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage)
throws WriteTimeoutException, OverloadedException
{
for (WriteResponseHandlerWrapper wrapper : wrappers)
{
Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints);
- sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter);
+ sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage);
}
+
for (WriteResponseHandlerWrapper wrapper : wrappers)
wrapper.handler.get();
}
@@ -815,25 +943,52 @@ public class StorageProxy implements StorageProxyMBean
return responseHandler;
}
- // same as above except does not initiate writes (but does perform availability checks).
- private static WriteResponseHandlerWrapper wrapResponseHandler(Mutation mutation, ConsistencyLevel consistency_level, WriteType writeType)
+ // same as performWrites except does not initiate writes (but does perform availability checks).
+ private static WriteResponseHandlerWrapper wrapBatchResponseHandler(Mutation mutation,
+ ConsistencyLevel consistency_level,
+ ConsistencyLevel batchConsistencyLevel,
+ WriteType writeType,
+ BatchlogResponseHandler.BatchlogCleanup cleanup)
{
- AbstractReplicationStrategy rs = Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy();
+ Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+ AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
String keyspaceName = mutation.getKeyspaceName();
Token tk = mutation.key().getToken();
List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
- AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
- return new WriteResponseHandlerWrapper(responseHandler, mutation);
+ AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
+ BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
+ return new WriteResponseHandlerWrapper(batchHandler, mutation);
+ }
+
+ /**
+ * Same as performWrites except does not initiate writes (but does perform availability checks).
+ * Keeps track of MVWriteMetrics
+ */
+ private static WriteResponseHandlerWrapper wrapMVBatchResponseHandler(Mutation mutation,
+ ConsistencyLevel consistency_level,
+ ConsistencyLevel batchConsistencyLevel,
+ List<InetAddress> naturalEndpoints,
+ WriteType writeType,
+ BatchlogResponseHandler.BatchlogCleanup cleanup)
+ {
+ Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+ AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
+ String keyspaceName = mutation.getKeyspaceName();
+ Token tk = mutation.key().getToken();
+ Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
+ AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
+ BatchlogResponseHandler<IMutation> batchHandler = new MVWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
+ return new WriteResponseHandlerWrapper(batchHandler, mutation);
}
// used by atomic_batch_mutate to decouple availability check from the write itself, caches consistency level and endpoints.
private static class WriteResponseHandlerWrapper
{
- final AbstractWriteResponseHandler<IMutation> handler;
+ final BatchlogResponseHandler<IMutation> handler;
final Mutation mutation;
- WriteResponseHandlerWrapper(AbstractWriteResponseHandler<IMutation> handler, Mutation mutation)
+ WriteResponseHandlerWrapper(BatchlogResponseHandler<IMutation> handler, Mutation mutation)
{
this.handler = handler;
this.mutation = mutation;
@@ -886,7 +1041,8 @@ public class StorageProxy implements StorageProxyMBean
public static void sendToHintedEndpoints(final Mutation mutation,
Iterable<InetAddress> targets,
AbstractWriteResponseHandler<IMutation> responseHandler,
- String localDataCenter)
+ String localDataCenter,
+ Stage stage)
throws OverloadedException
{
// extra-datacenter replicas, grouped by dc
@@ -950,7 +1106,7 @@ public class StorageProxy implements StorageProxyMBean
}
if (insertLocal)
- insertLocal(mutation, responseHandler);
+ insertLocal(stage, mutation, responseHandler);
if (dcGroups != null)
{
@@ -1059,10 +1215,9 @@ public class StorageProxy implements StorageProxyMBean
}
}
- private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler)
+ private static void insertLocal(Stage stage, final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler)
{
-
- StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable()
+ StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable()
{
public void runMayThrow()
{
@@ -1073,7 +1228,8 @@ public class StorageProxy implements StorageProxyMBean
}
catch (Exception ex)
{
- logger.error("Failed to apply mutation locally : {}", ex);
+ if (!(ex instanceof WriteTimeoutException))
+ logger.error("Failed to apply mutation locally : {}", ex);
responseHandler.onFailure(FBUtilities.getBroadcastAddress());
}
}
@@ -1193,7 +1349,7 @@ public class StorageProxy implements StorageProxyMBean
Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets),
ImmutableSet.of(FBUtilities.getBroadcastAddress()));
if (!remotes.isEmpty())
- sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter);
+ sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter, Stage.COUNTER_MUTATION);
}
};
}
@@ -2123,6 +2279,24 @@ public class StorageProxy implements StorageProxyMBean
}
/**
+ * This class captures metrics for materialized views writes.
+ */
+ private static class MVWriteMetricsWrapped extends BatchlogResponseHandler<IMutation>
+ {
+ public MVWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup)
+ {
+ super(writeHandler, i, cleanup);
+ mvWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
+ }
+
+ public void response(MessageIn<IMutation> msg)
+ {
+ super.response(msg);
+ mvWriteMetrics.viewReplicasSuccess.inc();
+ }
+ }
+
+ /**
* A Runnable that aborts if it doesn't start running before it times out
*/
private static abstract class DroppableRunnable implements Runnable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index da53bf7..5049337 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -297,6 +297,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
/* register the verb handlers */
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler());
+ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCHLOG_MUTATION, new MutationVerbHandler());
+ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MATERIALIZED_VIEW_MUTATION, new MutationVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new ReadCommandVerbHandler());
@@ -629,9 +631,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void runMayThrow() throws InterruptedException
{
inShutdownHook = true;
+ ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
+ ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION);
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
- if (mutationStage.isShutdown() && counterMutationStage.isShutdown())
+ if (mutationStage.isShutdown()
+ && counterMutationStage.isShutdown()
+ && batchlogMutationStage.isShutdown()
+ && materializedViewMutationStage.isShutdown())
return; // drained already
if (daemon != null)
@@ -642,8 +649,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// In-progress writes originating here could generate hints to be written, so shut down MessagingService
// before mutation stage, so we can get all the hints saved before shutting down
MessagingService.instance().shutdown();
+ materializedViewMutationStage.shutdown();
+ batchlogMutationStage.shutdown();
counterMutationStage.shutdown();
mutationStage.shutdown();
+ materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
+ batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
StorageProxy.instance.verifyNoHintsInProgress();
@@ -3820,8 +3831,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
inShutdownHook = true;
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
+ ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION);
+ ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
- if (mutationStage.isTerminated() && counterMutationStage.isTerminated())
+ if (mutationStage.isTerminated()
+ && counterMutationStage.isTerminated()
+ && batchlogMutationStage.isTerminated()
+ && materializedViewMutationStage.isTerminated())
{
logger.warn("Cannot drain node (did it already happen?)");
return;
@@ -3835,8 +3851,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().shutdown();
setMode(Mode.DRAINING, "clearing mutation stage", false);
+ materializedViewMutationStage.shutdown();
+ batchlogMutationStage.shutdown();
counterMutationStage.shutdown();
mutationStage.shutdown();
+ materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
+ batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);