You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/16 16:35:16 UTC
[5/6] cassandra git commit: Improve MV schema representation
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
new file mode 100644
index 0000000..1a020ce
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.cql3.CFName;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.selection.RawSelector;
+import org.apache.cassandra.cql3.selection.Selectable;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.db.view.View;
+import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.transport.Event;
+
+public class CreateViewStatement extends SchemaAlteringStatement
+{
+ private final CFName baseName;
+ private final List<RawSelector> selectClause;
+ private final List<ColumnIdentifier.Raw> notNullWhereClause;
+ private final List<ColumnIdentifier.Raw> partitionKeys;
+ private final List<ColumnIdentifier.Raw> clusteringKeys;
+ public final CFProperties properties = new CFProperties();
+ private final boolean ifNotExists;
+
+ public CreateViewStatement(CFName viewName,
+ CFName baseName,
+ List<RawSelector> selectClause,
+ List<ColumnIdentifier.Raw> notNullWhereClause,
+ List<ColumnIdentifier.Raw> partitionKeys,
+ List<ColumnIdentifier.Raw> clusteringKeys,
+ boolean ifNotExists)
+ {
+ super(viewName);
+ this.baseName = baseName;
+ this.selectClause = selectClause;
+ this.notNullWhereClause = notNullWhereClause;
+ this.partitionKeys = partitionKeys;
+ this.clusteringKeys = clusteringKeys;
+ this.ifNotExists = ifNotExists;
+ }
+
+
+ public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+ {
+ if (!baseName.hasKeyspace())
+ baseName.setKeyspace(keyspace(), true);
+ state.hasColumnFamilyAccess(keyspace(), baseName.getColumnFamily(), Permission.ALTER);
+ }
+
+ public void validate(ClientState state) throws RequestValidationException
+ {
+ // We do validation in announceMigration to reduce doubling up of work
+ }
+
+ private interface AddColumn {
+ void add(ColumnIdentifier identifier, AbstractType<?> type);
+ }
+
+ private void add(CFMetaData baseCfm, Iterable<ColumnIdentifier> columns, AddColumn adder)
+ {
+ for (ColumnIdentifier column : columns)
+ {
+ AbstractType<?> type = baseCfm.getColumnDefinition(column).type;
+ if (properties.definedOrdering.containsKey(column))
+ {
+ boolean desc = properties.definedOrdering.get(column);
+ if (!desc && type.isReversed())
+ {
+ type = ((ReversedType)type).baseType;
+ }
+ else if (desc && !type.isReversed())
+ {
+ type = ReversedType.getInstance(type);
+ }
+ }
+ adder.add(column, type);
+ }
+ }
+
+ public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
+ {
+ // We need to make sure that:
+ // - primary key includes all columns in base table's primary key
+ // - make sure that the select statement does not have anything other than columns
+ // and their names match the base table's names
+ // - make sure that primary key does not include any collections
+ // - make sure there is no where clause in the select statement
+ // - make sure there is not currently a table or view
+ // - make sure baseTable gcGraceSeconds > 0
+
+ properties.validate();
+
+ if (properties.useCompactStorage)
+ throw new InvalidRequestException("Cannot use 'COMPACT STORAGE' when defining a materialized view");
+
+ // We enforce the keyspace because if the RF is different, the logic to wait for a
+ // specific replica would break
+ if (!baseName.getKeyspace().equals(keyspace()))
+ throw new InvalidRequestException("Cannot create a materialized view on a table in a separate keyspace");
+
+ CFMetaData cfm = ThriftValidation.validateColumnFamily(baseName.getKeyspace(), baseName.getColumnFamily());
+
+ if (cfm.isCounter())
+ throw new InvalidRequestException("Materialized views are not supported on counter tables");
+ if (cfm.isView())
+ throw new InvalidRequestException("Materialized views cannot be created against other materialized views");
+
+ if (cfm.params.gcGraceSeconds == 0)
+ {
+ throw new InvalidRequestException(String.format("Cannot create materialized view '%s' for base table " +
+ "'%s' with gc_grace_seconds of 0, since this value is " +
+ "used to TTL undelivered updates. Setting gc_grace_seconds" +
+ " too low might cause undelivered updates to expire " +
+ "before being replayed.", cfName.getColumnFamily(),
+ baseName.getColumnFamily()));
+ }
+
+ Set<ColumnIdentifier> included = new HashSet<>();
+ for (RawSelector selector : selectClause)
+ {
+ Selectable.Raw selectable = selector.selectable;
+ if (selectable instanceof Selectable.WithFieldSelection.Raw)
+ throw new InvalidRequestException("Cannot select out a part of type when defining a materialized view");
+ if (selectable instanceof Selectable.WithFunction.Raw)
+ throw new InvalidRequestException("Cannot use function when defining a materialized view");
+ if (selectable instanceof Selectable.WritetimeOrTTL.Raw)
+ throw new InvalidRequestException("Cannot use function when defining a materialized view");
+ ColumnIdentifier identifier = (ColumnIdentifier) selectable.prepare(cfm);
+ if (selector.alias != null)
+ throw new InvalidRequestException(String.format("Cannot alias column '%s' as '%s' when defining a materialized view", identifier.toString(), selector.alias.toString()));
+
+ ColumnDefinition cdef = cfm.getColumnDefinition(identifier);
+
+ if (cdef == null)
+ throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier);
+
+ if (cdef.isStatic())
+ ClientWarn.warn(String.format("Unable to include static column '%s' in Materialized View SELECT statement", identifier));
+ else
+ included.add(identifier);
+ }
+
+ Set<ColumnIdentifier.Raw> targetPrimaryKeys = new HashSet<>();
+ for (ColumnIdentifier.Raw identifier : Iterables.concat(partitionKeys, clusteringKeys))
+ {
+ if (!targetPrimaryKeys.add(identifier))
+ throw new InvalidRequestException("Duplicate entry found in PRIMARY KEY: "+identifier);
+
+ ColumnDefinition cdef = cfm.getColumnDefinition(identifier.prepare(cfm));
+
+ if (cdef == null)
+ throw new InvalidRequestException("Unknown column name detected in CREATE MATERIALIZED VIEW statement : "+identifier);
+
+ if (cfm.getColumnDefinition(identifier.prepare(cfm)).type.isMultiCell())
+ throw new InvalidRequestException(String.format("Cannot use MultiCell column '%s' in PRIMARY KEY of materialized view", identifier));
+
+ if (cdef.isStatic())
+ throw new InvalidRequestException(String.format("Cannot use Static column '%s' in PRIMARY KEY of materialized view", identifier));
+ }
+
+ Set<ColumnIdentifier> basePrimaryKeyCols = new HashSet<>();
+ for (ColumnDefinition definition : Iterables.concat(cfm.partitionKeyColumns(), cfm.clusteringColumns()))
+ basePrimaryKeyCols.add(definition.name);
+
+ List<ColumnIdentifier> targetClusteringColumns = new ArrayList<>();
+ List<ColumnIdentifier> targetPartitionKeys = new ArrayList<>();
+ Set<ColumnIdentifier> notNullColumns = new HashSet<>();
+ if (notNullWhereClause != null)
+ {
+ for (ColumnIdentifier.Raw raw : notNullWhereClause)
+ {
+ notNullColumns.add(raw.prepare(cfm));
+ }
+ }
+
+ // This is only used as an intermediate state; this is to catch whether multiple non-PK columns are used
+ boolean hasNonPKColumn = false;
+ for (ColumnIdentifier.Raw raw : partitionKeys)
+ {
+ hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetPartitionKeys, notNullColumns);
+ }
+
+ for (ColumnIdentifier.Raw raw : clusteringKeys)
+ {
+ hasNonPKColumn = getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetClusteringColumns, notNullColumns);
+ }
+
+ // We need to include all of the primary key colums from the base table in order to make sure that we do not
+ // overwrite values in the view. We cannot support "collapsing" the base table into a smaller number of rows in
+ // the view because if we need to generate a tombstone, we have no way of knowing which value is currently being
+ // used in the view and whether or not to generate a tombstone. In order to not surprise our users, we require
+ // that they include all of the columns. We provide them with a list of all of the columns left to include.
+ boolean missingClusteringColumns = false;
+ StringBuilder columnNames = new StringBuilder();
+ List<ColumnIdentifier> includedColumns = new ArrayList<>();
+ for (ColumnDefinition def : cfm.allColumns())
+ {
+ ColumnIdentifier identifier = def.name;
+
+ if ((included.isEmpty() || included.contains(identifier))
+ && !targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier)
+ && !def.isStatic())
+ {
+ includedColumns.add(identifier);
+ }
+ if (!def.isPrimaryKeyColumn()) continue;
+
+ if (!targetClusteringColumns.contains(identifier) && !targetPartitionKeys.contains(identifier))
+ {
+ if (missingClusteringColumns)
+ columnNames.append(',');
+ else
+ missingClusteringColumns = true;
+ columnNames.append(identifier);
+ }
+ }
+ if (missingClusteringColumns)
+ throw new InvalidRequestException(String.format("Cannot create Materialized View %s without primary key columns from base %s (%s)",
+ columnFamily(), baseName.getColumnFamily(), columnNames.toString()));
+
+ if (targetPartitionKeys.isEmpty())
+ throw new InvalidRequestException("Must select at least a column for a Materialized View");
+
+ if (targetClusteringColumns.isEmpty())
+ throw new InvalidRequestException("No columns are defined for Materialized View other than primary key");
+
+ CFMetaData.Builder cfmBuilder = CFMetaData.Builder.createView(keyspace(), columnFamily());
+ add(cfm, targetPartitionKeys, cfmBuilder::addPartitionKey);
+ add(cfm, targetClusteringColumns, cfmBuilder::addClusteringColumn);
+ add(cfm, includedColumns, cfmBuilder::addRegularColumn);
+ TableParams params = properties.properties.asNewTableParams();
+ CFMetaData viewCfm = cfmBuilder.build().params(params);
+ ViewDefinition definition = new ViewDefinition(keyspace(),
+ columnFamily(),
+ Schema.instance.getId(keyspace(), baseName.getColumnFamily()),
+ included.isEmpty(),
+ viewCfm);
+
+ try
+ {
+ MigrationManager.announceNewView(definition, isLocalOnly);
+ }
+ catch (AlreadyExistsException e)
+ {
+ if (ifNotExists)
+ return false;
+ throw e;
+ }
+
+ return true;
+ }
+
+ private static boolean getColumnIdentifier(CFMetaData cfm,
+ Set<ColumnIdentifier> basePK,
+ boolean hasNonPKColumn,
+ ColumnIdentifier.Raw raw,
+ List<ColumnIdentifier> columns,
+ Set<ColumnIdentifier> allowedPKColumns)
+ {
+ ColumnIdentifier identifier = raw.prepare(cfm);
+
+ boolean isPk = basePK.contains(identifier);
+ if (!isPk && hasNonPKColumn)
+ {
+ throw new InvalidRequestException(String.format("Cannot include more than one non-primary key column '%s' in materialized view partition key", identifier));
+ }
+
+ // We don't need to include the "IS NOT NULL" filter on a non-composite partition key
+ // because we will never allow a single partition key to be NULL
+ boolean isSinglePartitionKey = cfm.getColumnDefinition(identifier).isPartitionKey()
+ && cfm.partitionKeyColumns().size() == 1;
+ if (!allowedPKColumns.remove(identifier) && !isSinglePartitionKey)
+ {
+ throw new InvalidRequestException(String.format("Primary key column '%s' is required to be filtered by 'IS NOT NULL'", identifier));
+ }
+
+ columns.add(identifier);
+ return !isPk;
+ }
+
+ public Event.SchemaChange changeEvent()
+ {
+ return new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/DropMaterializedViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropMaterializedViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropMaterializedViewStatement.java
deleted file mode 100644
index 8adba45..0000000
--- a/src/java/org/apache/cassandra/cql3/statements/DropMaterializedViewStatement.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.cql3.statements;
-
-import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.CFName;
-import org.apache.cassandra.db.view.MaterializedView;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.UnauthorizedException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.transport.Event;
-
-public class DropMaterializedViewStatement extends SchemaAlteringStatement
-{
- public final boolean ifExists;
-
- public DropMaterializedViewStatement(CFName cf, boolean ifExists)
- {
- super(cf);
- this.ifExists = ifExists;
- }
-
- public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
- {
- CFMetaData baseTable = MaterializedView.findBaseTable(keyspace(), columnFamily());
- if (baseTable != null)
- state.hasColumnFamilyAccess(keyspace(), baseTable.cfName, Permission.ALTER);
- }
-
- public void validate(ClientState state)
- {
- // validated in findIndexedCf()
- }
-
- public Event.SchemaChange changeEvent()
- {
- return new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
- }
-
- public boolean announceMigration(boolean isLocalOnly) throws InvalidRequestException, ConfigurationException
- {
- try
- {
- CFMetaData viewCfm = Schema.instance.getCFMetaData(keyspace(), columnFamily());
- if (viewCfm == null)
- throw new ConfigurationException(String.format("Cannot drop non existing materialized view '%s' in keyspace '%s'.", columnFamily(), keyspace()));
- if (!viewCfm.isMaterializedView())
- throw new ConfigurationException(String.format("Cannot drop non materialized view '%s' in keyspace '%s'", columnFamily(), keyspace()));
-
- CFMetaData baseCfm = MaterializedView.findBaseTable(keyspace(), columnFamily());
- if (baseCfm == null)
- {
- if (ifExists)
- throw new ConfigurationException(String.format("Cannot drop materialized view '%s' in keyspace '%s' without base CF.", columnFamily(), keyspace()));
- else
- throw new InvalidRequestException(String.format("View '%s' could not be found in any of the tables of keyspace '%s'", cfName, keyspace()));
- }
-
- CFMetaData updatedCfm = baseCfm.copy();
- updatedCfm.materializedViews(updatedCfm.getMaterializedViews().without(columnFamily()));
- MigrationManager.announceColumnFamilyUpdate(updatedCfm, false, isLocalOnly);
- MigrationManager.announceColumnFamilyDrop(keyspace(), columnFamily(), isLocalOnly);
- return true;
- }
- catch (ConfigurationException e)
- {
- if (ifExists)
- return false;
- throw e;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
index 35dc947..14d89d9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
@@ -19,12 +19,13 @@ package org.apache.cassandra.cql3.statements;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.MaterializedViewDefinition;
+import org.apache.cassandra.config.ViewDefinition;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.CFName;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.transport.Event;
@@ -61,20 +62,24 @@ public class DropTableStatement extends SchemaAlteringStatement
{
try
{
- CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily());
+ KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace());
+ CFMetaData cfm = ksm.tables.getNullable(columnFamily());
if (cfm != null)
{
- if (cfm.isMaterializedView())
+ if (cfm.isView())
throw new InvalidRequestException("Cannot use DROP TABLE on Materialized View");
boolean rejectDrop = false;
StringBuilder messageBuilder = new StringBuilder();
- for (MaterializedViewDefinition def : cfm.getMaterializedViews())
+ for (ViewDefinition def : ksm.views)
{
- if (rejectDrop)
- messageBuilder.append(',');
- rejectDrop = true;
- messageBuilder.append(def.viewName);
+ if (def.baseTableId.equals(cfm.cfId))
+ {
+ if (rejectDrop)
+ messageBuilder.append(',');
+ rejectDrop = true;
+ messageBuilder.append(def.viewName);
+ }
}
if (rejectDrop)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
index 75f6200..74c8c36 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
@@ -87,7 +87,7 @@ public class DropTypeStatement extends SchemaAlteringStatement
if (!ut.name.equals(name.getUserTypeName()) && isUsedBy(ut))
throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by user type %s", name, ut.asCQL3Type()));
- for (CFMetaData cfm : ksm.tables)
+ for (CFMetaData cfm : ksm.tablesAndViews())
for (ColumnDefinition def : cfm.allColumns())
if (isUsedBy(def.type))
throw new InvalidRequestException(String.format("Cannot drop user type %s as it is still used by table %s.%s", name, cfm.ksName, cfm.cfName));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java
new file mode 100644
index 0000000..f2be370
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/DropViewStatement.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.ViewDefinition;
+import org.apache.cassandra.cql3.CFName;
+import org.apache.cassandra.db.view.View;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.transport.Event;
+
+public class DropViewStatement extends SchemaAlteringStatement
+{
+ public final boolean ifExists;
+
+ public DropViewStatement(CFName cf, boolean ifExists)
+ {
+ super(cf);
+ this.ifExists = ifExists;
+ }
+
+ public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
+ {
+ CFMetaData baseTable = View.findBaseTable(keyspace(), columnFamily());
+ if (baseTable != null)
+ state.hasColumnFamilyAccess(keyspace(), baseTable.cfName, Permission.ALTER);
+ }
+
+ public void validate(ClientState state)
+ {
+ // validated in findIndexedCf()
+ }
+
+ public Event.SchemaChange changeEvent()
+ {
+ return new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
+ }
+
+ public boolean announceMigration(boolean isLocalOnly) throws InvalidRequestException, ConfigurationException
+ {
+ try
+ {
+// ViewDefinition view = Schema.instance.getViewDefinition(keyspace(), columnFamily());
+// if (view == null)
+// {
+// if (Schema.instance.getCFMetaData(keyspace(), columnFamily()) != null)
+// throw new ConfigurationException(String.format("Cannot drop table '%s' in keyspace '%s'.", columnFamily(), keyspace()));
+//
+// throw new ConfigurationException(String.format("Cannot drop non existing materialized view '%s' in keyspace '%s'.", columnFamily(), keyspace()));
+// }
+//
+// CFMetaData baseCfm = Schema.instance.getCFMetaData(view.baseTableId);
+// if (baseCfm == null)
+// {
+// if (ifExists)
+// throw new ConfigurationException(String.format("Cannot drop materialized view '%s' in keyspace '%s' without base CF.", columnFamily(), keyspace()));
+// else
+// throw new InvalidRequestException(String.format("View '%s' could not be found in any of the tables of keyspace '%s'", cfName, keyspace()));
+// }
+
+ MigrationManager.announceViewDrop(keyspace(), columnFamily(), isLocalOnly);
+ return true;
+ }
+ catch (ConfigurationException e)
+ {
+ if (ifExists)
+ return false;
+ throw e;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 3855b6a..a04af4c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -21,13 +21,13 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterables;
-
-import static org.apache.cassandra.cql3.statements.RequestValidations.checkNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.MaterializedViewDefinition;
+import org.apache.cassandra.config.ViewDefinition;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.ColumnIdentifier.Raw;
import org.apache.cassandra.cql3.functions.Function;
@@ -38,10 +38,8 @@ import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.db.view.View;
+import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
@@ -52,9 +50,9 @@ import org.apache.cassandra.triggers.TriggerExecutor;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNull;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
/*
@@ -170,14 +168,9 @@ public abstract class ModificationStatement implements CQLStatement
return cfm.isCounter();
}
- public boolean isMaterializedView()
- {
- return cfm.isMaterializedView();
- }
-
- public boolean hasMaterializedViews()
+ public boolean isView()
{
- return !cfm.getMaterializedViews().isEmpty();
+ return cfm.isView();
}
public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
@@ -203,13 +196,16 @@ public abstract class ModificationStatement implements CQLStatement
if (hasConditions())
state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.SELECT);
- // MV updates need to get the current state from the table, and might update the materialized views
+ // MV updates need to get the current state from the table, and might update the views
// Require Permission.SELECT on the base table, and Permission.MODIFY on the views
- if (hasMaterializedViews())
+ Iterator<ViewDefinition> views = View.findAll(keyspace(), columnFamily()).iterator();
+ if (views.hasNext())
{
state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.SELECT);
- for (MaterializedViewDefinition view : cfm.getMaterializedViews())
- state.hasColumnFamilyAccess(keyspace(), view.viewName, Permission.MODIFY);
+ do
+ {
+ state.hasColumnFamilyAccess(keyspace(), views.next().viewName, Permission.MODIFY);
+ } while (views.hasNext());
}
for (Function function : getFunctions())
@@ -221,7 +217,7 @@ public abstract class ModificationStatement implements CQLStatement
checkFalse(hasConditions() && attrs.isTimestampSet(), "Cannot provide custom timestamp for conditional updates");
checkFalse(isCounter() && attrs.isTimestampSet(), "Cannot provide custom timestamp for counter updates");
checkFalse(isCounter() && attrs.isTimeToLiveSet(), "Cannot provide custom TTL for counter updates");
- checkFalse(isMaterializedView(), "Cannot directly modify a materialized view");
+ checkFalse(isView(), "Cannot directly modify a materialized view");
}
public PartitionColumns updatedColumns()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 7ad6c09..18e402b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.db.view.MaterializedView;
+import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.index.Index;
@@ -175,9 +175,9 @@ public class SelectStatement implements CQLStatement
public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
{
- if (cfm.isMaterializedView())
+ if (cfm.isView())
{
- CFMetaData baseTable = MaterializedView.findBaseTable(keyspace(), columnFamily());
+ CFMetaData baseTable = View.findBaseTable(keyspace(), columnFamily());
if (baseTable != null)
state.hasColumnFamilyAccess(keyspace(), baseTable.cfName, Permission.SELECT);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 5dd306a..66b3da0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -65,7 +65,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
try
{
CFMetaData metaData = Schema.instance.getCFMetaData(keyspace(), columnFamily());
- if (metaData.isMaterializedView())
+ if (metaData.isView())
throw new InvalidRequestException("Cannot TRUNCATE materialized view directly; must truncate base table instead");
StorageProxy.truncateBlocking(keyspace(), columnFamily());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c7d8926..0d6d801 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -48,11 +48,11 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.view.ViewManager;
import org.apache.cassandra.db.lifecycle.*;
import org.apache.cassandra.db.partitions.CachedPartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.CellPath;
-import org.apache.cassandra.db.view.MaterializedViewManager;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -193,7 +193,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private final AtomicInteger fileIndexGenerator = new AtomicInteger(0);
public final SecondaryIndexManager indexManager;
- public final MaterializedViewManager materializedViewManager;
+ public final ViewManager.ForStore viewManager;
/* These are locally held copies to be changed from the config during runtime */
private volatile DefaultInteger minCompactionThreshold;
@@ -231,7 +231,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
indexManager.reload();
- materializedViewManager.reload();
// If the CF comparator has changed, we need to change the memtable,
// because the old one still aliases the previous comparator.
if (data.getView().getCurrentMemtable().initialComparator != metadata.comparator)
@@ -377,7 +376,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
this.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold());
this.directories = directories;
this.indexManager = new SecondaryIndexManager(this);
- this.materializedViewManager = new MaterializedViewManager(this);
+ this.viewManager = keyspace.viewManager.forTable(metadata.cfId);
this.metric = new TableMetrics(this);
fileIndexGenerator.set(generation);
sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
@@ -513,7 +512,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
data.dropSSTables();
LifecycleTransaction.waitForDeletions();
indexManager.invalidateAllIndexesBlocking();
- materializedViewManager.invalidate();
invalidateCaches();
}
@@ -633,8 +631,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// must be called after all sstables are loaded since row cache merges all row versions
public void init()
{
- materializedViewManager.init();
-
if (!isRowCacheEnabled())
return;
@@ -1912,7 +1908,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// flush the CF being truncated before forcing the new segment
forceBlockingFlush();
- materializedViewManager.forceBlockingFlush();
+ viewManager.forceBlockingFlush();
// sleep a little to make sure that our truncatedAt comes after any sstable
// that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
@@ -1921,7 +1917,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
else
{
dumpMemtable();
- materializedViewManager.dumpMemtables();
+ viewManager.dumpMemtables();
}
Runnable truncateRunnable = new Runnable()
@@ -1940,7 +1936,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
indexManager.truncateAllIndexesBlocking(truncatedAt);
- materializedViewManager.truncateBlocking(truncatedAt);
+ viewManager.truncateBlocking(truncatedAt);
SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
logger.debug("cleaning out row cache");
@@ -1974,7 +1970,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
logger.debug("Cancelling in-progress compactions for {}", metadata.cfName);
Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews
- ? Iterables.concat(concatWithIndexes(), materializedViewManager.allViewsCfs())
+ ? Iterables.concat(concatWithIndexes(), viewManager.allViewsCfs())
: concatWithIndexes();
for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 4661bae..1169b45 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -40,7 +40,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.view.MaterializedViewManager;
+import org.apache.cassandra.db.view.ViewManager;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.SecondaryIndexManager;
@@ -81,12 +81,13 @@ public class Keyspace
private volatile KeyspaceMetadata metadata;
//OpOrder is defined globally since we need to order writes across
- //Keyspaces in the case of MaterializedViews (batchlog of MV mutations)
+ //Keyspaces in the case of Views (batchlog of view mutations)
public static final OpOrder writeOrder = new OpOrder();
/* ColumnFamilyStore per column family */
private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<>();
private volatile AbstractReplicationStrategy replicationStrategy;
+ public final ViewManager viewManager;
public static final Function<String,Keyspace> keyspaceTransformer = new Function<String, Keyspace>()
{
@@ -305,11 +306,13 @@ public class Keyspace
createReplicationStrategy(metadata);
this.metric = new KeyspaceMetrics(this);
- for (CFMetaData cfm : metadata.tables)
+ this.viewManager = new ViewManager(this);
+ for (CFMetaData cfm : metadata.tablesAndViews())
{
logger.debug("Initializing {}.{}", getName(), cfm.cfName);
initCf(cfm.cfId, cfm.cfName, loadSSTables);
}
+ this.viewManager.reload();
}
private Keyspace(KeyspaceMetadata metadata)
@@ -317,6 +320,7 @@ public class Keyspace
this.metadata = metadata;
createReplicationStrategy(metadata);
this.metric = new KeyspaceMetrics(this);
+ this.viewManager = new ViewManager(this);
}
public static Keyspace mockKS(KeyspaceMetadata metadata)
@@ -418,11 +422,11 @@ public class Keyspace
throw new RuntimeException("Testing write failures");
Lock lock = null;
- boolean requiresViewUpdate = updateIndexes && MaterializedViewManager.updatesAffectView(Collections.singleton(mutation), false);
+ boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false);
if (requiresViewUpdate)
{
- lock = MaterializedViewManager.acquireLockFor(mutation.key().getKey());
+ lock = ViewManager.acquireLockFor(mutation.key().getKey());
if (lock == null)
{
@@ -430,11 +434,11 @@ public class Keyspace
{
logger.debug("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
Tracing.trace("Could not acquire MV lock");
- throw new WriteTimeoutException(WriteType.MATERIALIZED_VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
+ throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
}
else
{
- //This MV update can't happen right now. so rather than keep this thread busy
+ //This view update can't happen right now. so rather than keep this thread busy
// we will re-apply ourself to the queue and try again later
StageManager.getStage(Stage.MUTATION).execute(() -> {
if (writeCommitLog)
@@ -472,7 +476,7 @@ public class Keyspace
try
{
Tracing.trace("Creating materialized view mutations from base table replica");
- cfs.materializedViewManager.pushViewReplicaUpdates(upd, !isClReplay);
+ viewManager.pushViewReplicaUpdates(upd, !isClReplay);
}
catch (Throwable t)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index d54ee8b..6b2585e 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -95,8 +95,8 @@ public final class SystemKeyspace
public static final String SSTABLE_ACTIVITY = "sstable_activity";
public static final String SIZE_ESTIMATES = "size_estimates";
public static final String AVAILABLE_RANGES = "available_ranges";
- public static final String MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS = "materialized_views_builds_in_progress";
- public static final String BUILT_MATERIALIZED_VIEWS = "built_materialized_views";
+ public static final String VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
+ public static final String BUILT_VIEWS = "built_views";
@Deprecated public static final String LEGACY_HINTS = "hints";
@Deprecated public static final String LEGACY_BATCHLOG = "batchlog";
@@ -246,9 +246,9 @@ public final class SystemKeyspace
+ "ranges set<blob>,"
+ "PRIMARY KEY ((keyspace_name)))");
- private static final CFMetaData MaterializedViewsBuildsInProgress =
- compile(MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS,
- "materialized views builds current progress",
+ private static final CFMetaData ViewsBuildsInProgress =
+ compile(VIEWS_BUILDS_IN_PROGRESS,
+ "views builds current progress",
"CREATE TABLE %s ("
+ "keyspace_name text,"
+ "view_name text,"
@@ -256,9 +256,9 @@ public final class SystemKeyspace
+ "generation_number int,"
+ "PRIMARY KEY ((keyspace_name), view_name))");
- private static final CFMetaData BuiltMaterializedViews =
- compile(BUILT_MATERIALIZED_VIEWS,
- "built materialized views",
+ private static final CFMetaData BuiltViews =
+ compile(BUILT_VIEWS,
+ "built views",
"CREATE TABLE %s ("
+ "keyspace_name text,"
+ "view_name text,"
@@ -414,7 +414,7 @@ public final class SystemKeyspace
public static KeyspaceMetadata metadata()
{
- return KeyspaceMetadata.create(NAME, KeyspaceParams.local(), tables(), Types.none(), functions());
+ return KeyspaceMetadata.create(NAME, KeyspaceParams.local(), tables(), Views.none(), Types.none(), functions());
}
private static Tables tables()
@@ -430,8 +430,8 @@ public final class SystemKeyspace
SSTableActivity,
SizeEstimates,
AvailableRanges,
- MaterializedViewsBuildsInProgress,
- BuiltMaterializedViews,
+ ViewsBuildsInProgress,
+ BuiltViews,
LegacyHints,
LegacyBatchlog,
LegacyKeyspaces,
@@ -531,61 +531,61 @@ public final class SystemKeyspace
public static boolean isViewBuilt(String keyspaceName, String viewName)
{
String req = "SELECT view_name FROM %s.\"%s\" WHERE keyspace_name=? AND view_name=?";
- UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_MATERIALIZED_VIEWS), keyspaceName, viewName);
+ UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_VIEWS), keyspaceName, viewName);
return !result.isEmpty();
}
- public static void setMaterializedViewBuilt(String keyspaceName, String viewName)
+ public static void setViewBuilt(String keyspaceName, String viewName)
{
String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name) VALUES (?, ?)";
- executeInternal(String.format(req, NAME, BUILT_MATERIALIZED_VIEWS), keyspaceName, viewName);
- forceBlockingFlush(BUILT_MATERIALIZED_VIEWS);
+ executeInternal(String.format(req, NAME, BUILT_VIEWS), keyspaceName, viewName);
+ forceBlockingFlush(BUILT_VIEWS);
}
- public static void setMaterializedViewRemoved(String keyspaceName, String viewName)
+ public static void setViewRemoved(String keyspaceName, String viewName)
{
String buildReq = "DELETE FROM %S.%s WHERE keyspace_name = ? AND view_name = ?";
- executeInternal(String.format(buildReq, NAME, MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName);
- forceBlockingFlush(MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS);
+ executeInternal(String.format(buildReq, NAME, VIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName);
+ forceBlockingFlush(VIEWS_BUILDS_IN_PROGRESS);
String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND view_name = ?";
- executeInternal(String.format(builtReq, NAME, BUILT_MATERIALIZED_VIEWS), keyspaceName, viewName);
- forceBlockingFlush(BUILT_MATERIALIZED_VIEWS);
+ executeInternal(String.format(builtReq, NAME, BUILT_VIEWS), keyspaceName, viewName);
+ forceBlockingFlush(BUILT_VIEWS);
}
- public static void beginMaterializedViewBuild(String ksname, String viewName, int generationNumber)
+ public static void beginViewBuild(String ksname, String viewName, int generationNumber)
{
- executeInternal(String.format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS),
+ executeInternal(String.format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", VIEWS_BUILDS_IN_PROGRESS),
ksname,
viewName,
generationNumber);
}
- public static void finishMaterializedViewBuildStatus(String ksname, String viewName)
+ public static void finishViewBuildStatus(String ksname, String viewName)
{
// We flush the view built first, because if we fail now, we'll restart at the last place we checkpointed
- // materialized view build.
+ // view build.
// If we flush the delete first, we'll have to restart from the beginning.
- // Also, if the build succeeded, but the materialized view build failed, we will be able to skip the
- // materialized view build check next boot.
- setMaterializedViewBuilt(ksname, viewName);
- forceBlockingFlush(BUILT_MATERIALIZED_VIEWS);
- executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
- forceBlockingFlush(MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS);
+ // Also, if the build succeeded, but the view build failed, we will be able to skip the view build check
+ // next boot.
+ setViewBuilt(ksname, viewName);
+ forceBlockingFlush(BUILT_VIEWS);
+ executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
+ forceBlockingFlush(VIEWS_BUILDS_IN_PROGRESS);
}
- public static void updateMaterializedViewBuildStatus(String ksname, String viewName, Token token)
+ public static void updateViewBuildStatus(String ksname, String viewName, Token token)
{
String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)";
- Token.TokenFactory factory = MaterializedViewsBuildsInProgress.partitioner.getTokenFactory();
- executeInternal(String.format(req, MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token));
+ Token.TokenFactory factory = ViewsBuildsInProgress.partitioner.getTokenFactory();
+ executeInternal(String.format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token));
}
- public static Pair<Integer, Token> getMaterializedViewBuildStatus(String ksname, String viewName)
+ public static Pair<Integer, Token> getViewBuildStatus(String ksname, String viewName)
{
String req = "SELECT generation_number, last_token FROM system.%s WHERE keyspace_name = ? AND view_name = ?";
- UntypedResultSet queryResultSet = executeInternal(String.format(req, MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
+ UntypedResultSet queryResultSet = executeInternal(String.format(req, VIEWS_BUILDS_IN_PROGRESS), ksname, viewName);
if (queryResultSet == null || queryResultSet.isEmpty())
return null;
@@ -597,7 +597,7 @@ public final class SystemKeyspace
generation = row.getInt("generation_number");
if (row.has("last_key"))
{
- Token.TokenFactory factory = MaterializedViewsBuildsInProgress.partitioner.getTokenFactory();
+ Token.TokenFactory factory = ViewsBuildsInProgress.partitioner.getTokenFactory();
lastKey = factory.fromString(row.getString("last_key"));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/WriteType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteType.java b/src/java/org/apache/cassandra/db/WriteType.java
index 20fb6a9..fdbe97d 100644
--- a/src/java/org/apache/cassandra/db/WriteType.java
+++ b/src/java/org/apache/cassandra/db/WriteType.java
@@ -25,5 +25,5 @@ public enum WriteType
COUNTER,
BATCH_LOG,
CAS,
- MATERIALIZED_VIEW;
+ VIEW;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5207f49..75d50e7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -45,7 +45,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.view.MaterializedViewBuilder;
+import org.apache.cassandra.db.view.ViewBuilder;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -1334,7 +1334,7 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- public Future<?> submitMaterializedViewBuilder(final MaterializedViewBuilder builder)
+ public Future<?> submitViewBuilder(final ViewBuilder builder)
{
Runnable runnable = new Runnable()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index f8f016c..a69622b 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -36,7 +36,7 @@ public enum OperationType
FLUSH("Flush"),
STREAM("Stream"),
WRITE("Write"),
- VIEW_BUILD("Materialized view build");
+ VIEW_BUILD("View build");
public final String type;
public final String fileName;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
deleted file mode 100644
index 52034cc..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ /dev/null
@@ -1,749 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.view;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.MaterializedViewDefinition;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.statements.CFProperties;
-import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder;
-import org.apache.cassandra.db.CBuilder;
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.LivenessInfo;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.ReadOrderGroup;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Slice;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.partitions.AbstractBTreePartition;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.BTreeRow;
-import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.db.rows.ColumnData;
-import org.apache.cassandra.db.rows.ComplexColumnData;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.service.pager.QueryPager;
-
-/**
- * A Materialized View copies data from a base table into a view table which can be queried independently from the
- * base. Every update which targets the base table must be fed through the {@link MaterializedViewManager} to ensure
- * that if a view needs to be updated, the updates are properly created and fed into the view.
- *
- * This class does the job of translating the base row to the view row.
- *
- * It handles reading existing state and figuring out what tombstones need to be generated.
- *
- * createMutations below is the "main method"
- *
- */
-public class MaterializedView
-{
- /**
- * The columns should all be updated together, so we use this object as group.
- */
- private static class MVColumns
- {
- //These are the base column definitions in terms of the *views* partitioning.
- //Meaning we can see (for example) the partition key of the view contains a clustering key
- //from the base table.
- public final List<ColumnDefinition> partitionDefs;
- public final List<ColumnDefinition> primaryKeyDefs;
- public final List<ColumnDefinition> baseComplexColumns;
-
- private MVColumns(List<ColumnDefinition> partitionDefs, List<ColumnDefinition> primaryKeyDefs, List<ColumnDefinition> baseComplexColumns)
- {
- this.partitionDefs = partitionDefs;
- this.primaryKeyDefs = primaryKeyDefs;
- this.baseComplexColumns = baseComplexColumns;
- }
- }
-
- public final String name;
-
- private final ColumnFamilyStore baseCfs;
- private ColumnFamilyStore _viewCfs = null;
-
- private MVColumns columns;
-
- private final boolean viewHasAllPrimaryKeys;
- private final boolean includeAll;
- private MaterializedViewBuilder builder;
-
- public MaterializedView(MaterializedViewDefinition definition,
- ColumnFamilyStore baseCfs)
- {
- this.baseCfs = baseCfs;
-
- name = definition.viewName;
- includeAll = definition.includeAll;
-
- viewHasAllPrimaryKeys = updateDefinition(definition);
- }
-
- /**
- * Lazily fetch the CFS instance for the view.
- * We do this lazily to avoid initilization issues.
- *
- * @return The views CFS instance
- */
- public ColumnFamilyStore getViewCfs()
- {
- if (_viewCfs == null)
- _viewCfs = Keyspace.openAndGetStore(Schema.instance.getCFMetaData(baseCfs.keyspace.getName(), name));
-
- return _viewCfs;
- }
-
-
- /**
- * Lookup column definitions in the base table that correspond to the view columns (should be 1:1)
- *
- * Notify caller if all primary keys in the view are ALL primary keys in the base. We do this to simplify
- * tombstone checks.
- *
- * @param columns a list of columns to lookup in the base table
- * @param definitions lists to populate for the base table definitions
- * @return true if all view PKs are also Base PKs
- */
- private boolean resolveAndAddColumns(Iterable<ColumnIdentifier> columns, List<ColumnDefinition>... definitions)
- {
- boolean allArePrimaryKeys = true;
- for (ColumnIdentifier identifier : columns)
- {
- ColumnDefinition cdef = baseCfs.metadata.getColumnDefinition(identifier);
- assert cdef != null : "Could not resolve column " + identifier.toString();
-
- for (List<ColumnDefinition> list : definitions)
- {
- list.add(cdef);
- }
-
- allArePrimaryKeys = allArePrimaryKeys && cdef.isPrimaryKeyColumn();
- }
-
- return allArePrimaryKeys;
- }
-
- /**
- * This updates the columns stored which are dependent on the base CFMetaData.
- *
- * @return true if the view contains only columns which are part of the base's primary key; false if there is at
- * least one column which is not.
- */
- public boolean updateDefinition(MaterializedViewDefinition definition)
- {
- List<ColumnDefinition> partitionDefs = new ArrayList<>(definition.partitionColumns.size());
- List<ColumnDefinition> primaryKeyDefs = new ArrayList<>(definition.partitionColumns.size()
- + definition.clusteringColumns.size());
- List<ColumnDefinition> baseComplexColumns = new ArrayList<>();
-
- // We only add the partition columns to the partitions list, but both partition columns and clustering
- // columns are added to the primary keys list
- boolean partitionAllPrimaryKeyColumns = resolveAndAddColumns(definition.partitionColumns, primaryKeyDefs, partitionDefs);
- boolean clusteringAllPrimaryKeyColumns = resolveAndAddColumns(definition.clusteringColumns, primaryKeyDefs);
-
- for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
- {
- if (cdef.isComplex())
- {
- baseComplexColumns.add(cdef);
- }
- }
-
- this.columns = new MVColumns(partitionDefs, primaryKeyDefs, baseComplexColumns);
-
- return partitionAllPrimaryKeyColumns && clusteringAllPrimaryKeyColumns;
- }
-
- /**
- * Check to see if the update could possibly modify a view. Cases where the view may be updated are:
- * <ul>
- * <li>View selects all columns</li>
- * <li>Update contains any range tombstones</li>
- * <li>Update touches one of the columns included in the view</li>
- * </ul>
- *
- * If the update contains any range tombstones, there is a possibility that it will not touch a range that is
- * currently included in the view.
- *
- * @return true if {@param partition} modifies a column included in the view
- */
- public boolean updateAffectsView(AbstractBTreePartition partition)
- {
- // If we are including all of the columns, then any update will be included
- if (includeAll)
- return true;
-
- // If there are range tombstones, tombstones will also need to be generated for the materialized view
- // This requires a query of the base rows and generating tombstones for all of those values
- if (!partition.deletionInfo().isLive())
- return true;
-
- // Check each row for deletion or update
- for (Row row : partition)
- {
- if (row.hasComplexDeletion())
- return true;
- if (!row.deletion().isLive())
- return true;
-
- for (ColumnData data : row)
- {
- if (getViewCfs().metadata.getColumnDefinition(data.column().name) != null)
- return true;
- }
- }
-
- return false;
- }
-
- /**
- * Creates the clustering columns for the view based on the specified row and resolver policy
- *
- * @param temporalRow The current row
- * @param resolver The policy to use when selecting versions of cells use
- * @return The clustering object to use for the view
- */
- private Clustering viewClustering(TemporalRow temporalRow, TemporalRow.Resolver resolver)
- {
- CFMetaData viewCfm = getViewCfs().metadata;
- int numViewClustering = viewCfm.clusteringColumns().size();
- CBuilder clustering = CBuilder.create(getViewCfs().getComparator());
- for (int i = 0; i < numViewClustering; i++)
- {
- ColumnDefinition definition = viewCfm.clusteringColumns().get(i);
- clustering.add(temporalRow.clusteringValue(definition, resolver));
- }
-
- return clustering.build();
- }
-
- /**
- * @return Mutation containing a range tombstone for a base partition key and TemporalRow.
- */
- private PartitionUpdate createTombstone(TemporalRow temporalRow,
- DecoratedKey partitionKey,
- Row.Deletion deletion,
- TemporalRow.Resolver resolver,
- int nowInSec)
- {
- CFMetaData viewCfm = getViewCfs().metadata;
- Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
- builder.newRow(viewClustering(temporalRow, resolver));
- builder.addRowDeletion(deletion);
- return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
- }
-
- /**
- * @return PartitionUpdate containing a complex tombstone for a TemporalRow, and the collection's column identifier.
- */
- private PartitionUpdate createComplexTombstone(TemporalRow temporalRow,
- DecoratedKey partitionKey,
- ColumnDefinition deletedColumn,
- DeletionTime deletionTime,
- TemporalRow.Resolver resolver,
- int nowInSec)
- {
-
- CFMetaData viewCfm = getViewCfs().metadata;
- Row.Builder builder = BTreeRow.unsortedBuilder(nowInSec);
- builder.newRow(viewClustering(temporalRow, resolver));
- builder.addComplexDeletion(deletedColumn, deletionTime);
- return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
- }
-
- /**
- * @return View's DecoratedKey or null, if one of the view's primary key components has an invalid resolution from
- * the TemporalRow and its Resolver
- */
- private DecoratedKey viewPartitionKey(TemporalRow temporalRow, TemporalRow.Resolver resolver)
- {
- List<ColumnDefinition> partitionDefs = this.columns.partitionDefs;
- Object[] partitionKey = new Object[partitionDefs.size()];
-
- for (int i = 0; i < partitionKey.length; i++)
- {
- ByteBuffer value = temporalRow.clusteringValue(partitionDefs.get(i), resolver);
-
- if (value == null)
- return null;
-
- partitionKey[i] = value;
- }
-
- CFMetaData metadata = getViewCfs().metadata;
- return metadata.decorateKey(CFMetaData.serializePartitionKey(metadata
- .getKeyValidatorAsClusteringComparator()
- .make(partitionKey)));
- }
-
- /**
- * @return mutation which contains the tombstone for the referenced TemporalRow, or null if not necessary.
- * TemporalRow's can reference at most one view row; there will be at most one row to be tombstoned, so only one
- * mutation is necessary
- */
- private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow)
- {
- // Primary Key and Clustering columns do not generate tombstones
- if (viewHasAllPrimaryKeys)
- return null;
-
- boolean hasUpdate = false;
- List<ColumnDefinition> primaryKeyDefs = this.columns.primaryKeyDefs;
- for (ColumnDefinition viewPartitionKeys : primaryKeyDefs)
- {
- if (!viewPartitionKeys.isPrimaryKeyColumn() && temporalRow.clusteringValue(viewPartitionKeys, TemporalRow.oldValueIfUpdated) != null)
- hasUpdate = true;
- }
-
- if (!hasUpdate)
- return null;
-
- TemporalRow.Resolver resolver = TemporalRow.earliest;
- return createTombstone(temporalRow,
- viewPartitionKey(temporalRow, resolver),
- Row.Deletion.shadowable(new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec)),
- resolver,
- temporalRow.nowInSec);
- }
-
- /**
- * @return Mutation which is the transformed base table mutation for the materialized view.
- */
- private PartitionUpdate createUpdatesForInserts(TemporalRow temporalRow)
- {
- TemporalRow.Resolver resolver = TemporalRow.latest;
-
- DecoratedKey partitionKey = viewPartitionKey(temporalRow, resolver);
- ColumnFamilyStore viewCfs = getViewCfs();
-
- if (partitionKey == null)
- {
- // Not having a partition key means we aren't updating anything
- return null;
- }
-
- Row.Builder regularBuilder = BTreeRow.unsortedBuilder(temporalRow.nowInSec);
-
- CBuilder clustering = CBuilder.create(viewCfs.getComparator());
- for (int i = 0; i < viewCfs.metadata.clusteringColumns().size(); i++)
- {
- clustering.add(temporalRow.clusteringValue(viewCfs.metadata.clusteringColumns().get(i), resolver));
- }
- regularBuilder.newRow(clustering.build());
- regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(viewCfs.metadata,
- temporalRow.viewClusteringTimestamp(),
- temporalRow.viewClusteringTtl(),
- temporalRow.viewClusteringLocalDeletionTime()));
-
- for (ColumnDefinition columnDefinition : viewCfs.metadata.allColumns())
- {
- if (columnDefinition.isPrimaryKeyColumn())
- continue;
-
- for (Cell cell : temporalRow.values(columnDefinition, resolver))
- {
- regularBuilder.addCell(cell);
- }
- }
-
- return PartitionUpdate.singleRowUpdate(viewCfs.metadata, partitionKey, regularBuilder.build());
- }
-
- /**
- * @param partition Update which possibly contains deletion info for which to generate view tombstones.
- * @return View Tombstones which delete all of the rows which have been removed from the base table with
- * {@param partition}
- */
- private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractBTreePartition partition)
- {
- final TemporalRow.Resolver resolver = TemporalRow.earliest;
-
- DeletionInfo deletionInfo = partition.deletionInfo();
-
- List<Mutation> mutations = new ArrayList<>();
-
- // Check the complex columns to see if there are any which may have tombstones we need to create for the view
- if (!columns.baseComplexColumns.isEmpty())
- {
- for (Row row : partition)
- {
- if (!row.hasComplexDeletion())
- continue;
-
- TemporalRow temporalRow = rowSet.getClustering(row.clustering());
-
- assert temporalRow != null;
-
- for (ColumnDefinition definition : columns.baseComplexColumns)
- {
- ComplexColumnData columnData = row.getComplexColumnData(definition);
-
- if (columnData != null)
- {
- DeletionTime time = columnData.complexDeletion();
- if (!time.isLive())
- {
- DecoratedKey targetKey = viewPartitionKey(temporalRow, resolver);
- if (targetKey != null)
- mutations.add(new Mutation(createComplexTombstone(temporalRow, targetKey, definition, time, resolver, temporalRow.nowInSec)));
- }
- }
- }
- }
- }
-
- ReadCommand command = null;
-
- if (!deletionInfo.isLive())
- {
- // We have to generate tombstones for all of the affected rows, but we don't have the information in order
- // to create them. This requires that we perform a read for the entire range that is being tombstoned, and
- // generate a tombstone for each. This may be slow, because a single range tombstone can cover up to an
- // entire partition of data which is not distributed on a single partition node.
- DecoratedKey dk = rowSet.dk;
-
- if (deletionInfo.hasRanges())
- {
- SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, dk);
- Iterator<RangeTombstone> tombstones = deletionInfo.rangeIterator(false);
- while (tombstones.hasNext())
- {
- RangeTombstone tombstone = tombstones.next();
-
- builder.addSlice(tombstone.deletedSlice());
- }
-
- command = builder.build();
- }
- else
- {
- command = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, rowSet.nowInSec, dk);
- }
- }
-
- if (command == null)
- {
- SinglePartitionSliceBuilder builder = null;
- for (Row row : partition)
- {
- if (!row.deletion().isLive())
- {
- if (builder == null)
- builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
- builder.addSlice(Slice.make(row.clustering()));
- }
- }
-
- if (builder != null)
- command = builder.build();
- }
-
- if (command != null)
- {
-
- //We may have already done this work for
- //another MV update so check
-
- if (!rowSet.hasTombstonedExisting())
- {
- QueryPager pager = command.getPager(null);
-
- // Add all of the rows which were recovered from the query to the row set
- while (!pager.isExhausted())
- {
- try (ReadOrderGroup orderGroup = pager.startOrderGroup();
- PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
- {
- if (!iter.hasNext())
- break;
-
- try (RowIterator rowIterator = iter.next())
- {
- while (rowIterator.hasNext())
- {
- Row row = rowIterator.next();
- rowSet.addRow(row, false);
- }
- }
- }
- }
-
- //Incase we fetched nothing, avoid re checking on another MV update
- rowSet.setTombstonedExisting();
- }
-
- // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone
- // for the view.
- for (TemporalRow temporalRow : rowSet)
- {
- DeletionTime deletionTime = temporalRow.deletionTime(partition);
- if (!deletionTime.isLive())
- {
- DecoratedKey value = viewPartitionKey(temporalRow, resolver);
- if (value != null)
- {
- PartitionUpdate update = createTombstone(temporalRow, value, Row.Deletion.regular(deletionTime), resolver, temporalRow.nowInSec);
- if (update != null)
- mutations.add(new Mutation(update));
- }
- }
- }
- }
-
- return !mutations.isEmpty() ? mutations : null;
- }
-
- /**
- * Read and update temporal rows in the set which have corresponding values stored on the local node
- */
- private void readLocalRows(TemporalRow.Set rowSet)
- {
- SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
-
- for (TemporalRow temporalRow : rowSet)
- builder.addSlice(temporalRow.baseSlice());
-
- QueryPager pager = builder.build().getPager(null);
-
- while (!pager.isExhausted())
- {
- try (ReadOrderGroup orderGroup = pager.startOrderGroup();
- PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
- {
- while (iter.hasNext())
- {
- try (RowIterator rows = iter.next())
- {
- while (rows.hasNext())
- {
- rowSet.addRow(rows.next(), false);
- }
- }
- }
- }
- }
- }
-
- /**
- * @return Set of rows which are contained in the partition update {@param partition}
- */
- private TemporalRow.Set separateRows(AbstractBTreePartition partition, Set<ColumnIdentifier> viewPrimaryKeyCols)
- {
-
- TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, viewPrimaryKeyCols, partition.partitionKey().getKey());
-
- for (Row row : partition)
- rowSet.addRow(row, true);
-
- return rowSet;
- }
-
- /**
- * Splits the partition update up and adds the existing state to each row.
- * This data can be reused for multiple MV updates on the same base table
- *
- * @param partition the mutation
- * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
- * since all of the update will already be present in the base table.
- * @return The set of temoral rows contained in this update
- */
- public TemporalRow.Set getTemporalRowSet(AbstractBTreePartition partition, TemporalRow.Set existing, boolean isBuilding)
- {
- if (!updateAffectsView(partition))
- return null;
-
- Set<ColumnIdentifier> columns = new HashSet<>(this.columns.primaryKeyDefs.size());
- for (ColumnDefinition def : this.columns.primaryKeyDefs)
- columns.add(def.name);
-
- TemporalRow.Set rowSet = null;
- if (existing == null)
- {
- rowSet = separateRows(partition, columns);
-
- // If we are building the view, we do not want to add old values; they will always be the same
- if (!isBuilding)
- readLocalRows(rowSet);
- }
- else
- {
- rowSet = existing.withNewViewPrimaryKey(columns);
- }
-
- return rowSet;
- }
-
-
- /**
- * @param isBuilding If the view is currently being built, we do not query the values which are already stored,
- * since all of the update will already be present in the base table.
- * @return View mutations which represent the changes necessary as long as previously created mutations for the view
- * have been applied successfully. This is based solely on the changes that are necessary given the current
- * state of the base table and the newly applying partition data.
- */
- public Collection<Mutation> createMutations(AbstractBTreePartition partition, TemporalRow.Set rowSet, boolean isBuilding)
- {
- if (!updateAffectsView(partition))
- return null;
-
- Collection<Mutation> mutations = null;
- for (TemporalRow temporalRow : rowSet)
- {
- // If we are building, there is no need to check for partition tombstones; those values will not be present
- // in the partition data
- if (!isBuilding)
- {
- PartitionUpdate partitionTombstone = createRangeTombstoneForRow(temporalRow);
- if (partitionTombstone != null)
- {
- if (mutations == null) mutations = new LinkedList<>();
- mutations.add(new Mutation(partitionTombstone));
- }
- }
-
- PartitionUpdate insert = createUpdatesForInserts(temporalRow);
- if (insert != null)
- {
- if (mutations == null) mutations = new LinkedList<>();
- mutations.add(new Mutation(insert));
- }
- }
-
- if (!isBuilding)
- {
- Collection<Mutation> deletion = createForDeletionInfo(rowSet, partition);
- if (deletion != null && !deletion.isEmpty())
- {
- if (mutations == null) mutations = new LinkedList<>();
- mutations.addAll(deletion);
- }
- }
-
- return mutations;
- }
-
- public synchronized void build()
- {
- if (this.builder != null)
- {
- this.builder.stop();
- this.builder = null;
- }
-
- this.builder = new MaterializedViewBuilder(baseCfs, this);
- CompactionManager.instance.submitMaterializedViewBuilder(builder);
- }
-
- @Nullable
- public static CFMetaData findBaseTable(String keyspace, String view)
- {
- KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
- if (ksm == null)
- return null;
-
- for (CFMetaData cfm : ksm.tables)
- if (cfm.getMaterializedViews().get(view).isPresent())
- return cfm;
-
- return null;
- }
-
- /**
- * @return CFMetaData which represents the definition given
- */
- public static CFMetaData getCFMetaData(MaterializedViewDefinition definition,
- CFMetaData baseCf,
- CFProperties properties)
- {
- CFMetaData.Builder viewBuilder = CFMetaData.Builder
- .createView(baseCf.ksName, definition.viewName);
-
- ColumnDefinition nonPkTarget = null;
-
- for (ColumnIdentifier targetIdentifier : definition.partitionColumns)
- {
- ColumnDefinition target = baseCf.getColumnDefinition(targetIdentifier);
- if (!target.isPartitionKey())
- nonPkTarget = target;
-
- viewBuilder.addPartitionKey(target.name, properties.getReversableType(targetIdentifier, target.type));
- }
-
- Collection<ColumnDefinition> included = new ArrayList<>();
- for(ColumnIdentifier identifier : definition.included)
- {
- ColumnDefinition cfDef = baseCf.getColumnDefinition(identifier);
- assert cfDef != null;
- included.add(cfDef);
- }
-
- boolean includeAll = included.isEmpty();
-
- for (ColumnIdentifier ident : definition.clusteringColumns)
- {
- ColumnDefinition column = baseCf.getColumnDefinition(ident);
- viewBuilder.addClusteringColumn(ident, properties.getReversableType(ident, column.type));
- }
-
- for (ColumnDefinition column : baseCf.partitionColumns().regulars)
- {
- if (column != nonPkTarget && (includeAll || included.contains(column)))
- {
- viewBuilder.addRegularColumn(column.name, column.type);
- }
- }
-
- //Add any extra clustering columns
- for (ColumnDefinition column : Iterables.concat(baseCf.partitionKeyColumns(), baseCf.clusteringColumns()))
- {
- if ( (!definition.partitionColumns.contains(column.name) && !definition.clusteringColumns.contains(column.name)) &&
- (includeAll || included.contains(column)) )
- {
- viewBuilder.addRegularColumn(column.name, column.type);
- }
- }
-
- return viewBuilder.build().params(properties.properties.asNewTableParams());
- }
-}