You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/16 16:35:38 UTC
[3/7] cassandra git commit: Improve MV schema representation
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/schema/MaterializedViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MaterializedViews.java b/src/java/org/apache/cassandra/schema/MaterializedViews.java
deleted file mode 100644
index 1c55736..0000000
--- a/src/java/org/apache/cassandra/schema/MaterializedViews.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.schema;
-
-
-import java.util.Iterator;
-import java.util.Optional;
-
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.cassandra.config.MaterializedViewDefinition;
-
-import static com.google.common.collect.Iterables.filter;
-
-public final class MaterializedViews implements Iterable<MaterializedViewDefinition>
-{
- private final ImmutableMap<String, MaterializedViewDefinition> materializedViews;
-
- private MaterializedViews(Builder builder)
- {
- materializedViews = builder.materializedViews.build();
- }
-
- public static Builder builder()
- {
- return new Builder();
- }
-
- public static MaterializedViews none()
- {
- return builder().build();
- }
-
- public Iterator<MaterializedViewDefinition> iterator()
- {
- return materializedViews.values().iterator();
- }
-
- public int size()
- {
- return materializedViews.size();
- }
-
- public boolean isEmpty()
- {
- return materializedViews.isEmpty();
- }
-
- /**
- * Get the materialized view with the specified name
- *
- * @param name a non-qualified materialized view name
- * @return an empty {@link Optional} if the materialized view name is not found; a non-empty optional of {@link MaterializedViewDefinition} otherwise
- */
- public Optional<MaterializedViewDefinition> get(String name)
- {
- return Optional.ofNullable(materializedViews.get(name));
- }
-
- /**
- * Create a MaterializedViews instance with the provided materialized view added
- */
- public MaterializedViews with(MaterializedViewDefinition materializedView)
- {
- if (get(materializedView.viewName).isPresent())
- throw new IllegalStateException(String.format("Materialized View %s already exists", materializedView.viewName));
-
- return builder().add(this).add(materializedView).build();
- }
-
- /**
- * Creates a MaterializedViews instance with the materializedView with the provided name removed
- */
- public MaterializedViews without(String name)
- {
- MaterializedViewDefinition materializedView =
- get(name).orElseThrow(() -> new IllegalStateException(String.format("Materialized View %s doesn't exists", name)));
-
- return builder().add(filter(this, v -> v != materializedView)).build();
- }
-
- /**
- * Creates a MaterializedViews instance which contains an updated materialized view
- */
- public MaterializedViews replace(MaterializedViewDefinition materializedView)
- {
- return without(materializedView.viewName).with(materializedView);
- }
-
- @Override
- public boolean equals(Object o)
- {
- return this == o || (o instanceof MaterializedViews && materializedViews.equals(((MaterializedViews) o).materializedViews));
- }
-
- @Override
- public int hashCode()
- {
- return materializedViews.hashCode();
- }
-
- @Override
- public String toString()
- {
- return materializedViews.values().toString();
- }
-
- public static final class Builder
- {
- final ImmutableMap.Builder<String, MaterializedViewDefinition> materializedViews = new ImmutableMap.Builder<>();
-
- private Builder()
- {
- }
-
- public MaterializedViews build()
- {
- return new MaterializedViews(this);
- }
-
- public Builder add(MaterializedViewDefinition materializedView)
- {
- materializedViews.put(materializedView.viewName, materializedView);
- return this;
- }
-
- public Builder add(Iterable<MaterializedViewDefinition> materializedViews)
- {
- materializedViews.forEach(this::add);
- return this;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 55b841b..c922612 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -68,14 +68,14 @@ public final class SchemaKeyspace
public static final String COLUMNS = "columns";
public static final String DROPPED_COLUMNS = "dropped_columns";
public static final String TRIGGERS = "triggers";
- public static final String MATERIALIZED_VIEWS = "materialized_views";
+ public static final String VIEWS = "views";
public static final String TYPES = "types";
public static final String FUNCTIONS = "functions";
public static final String AGGREGATES = "aggregates";
public static final String INDEXES = "indexes";
public static final List<String> ALL =
- ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, MATERIALIZED_VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
+ ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
private static final CFMetaData Keyspaces =
compile(KEYSPACES,
@@ -144,17 +144,31 @@ public final class SchemaKeyspace
+ "options frozen<map<text, text>>,"
+ "PRIMARY KEY ((keyspace_name), table_name, trigger_name))");
- private static final CFMetaData MaterializedViews =
- compile(MATERIALIZED_VIEWS,
- "materialized views definitions",
+ private static final CFMetaData Views =
+ compile(VIEWS,
+ "view definitions",
"CREATE TABLE %s ("
+ "keyspace_name text,"
- + "table_name text,"
+ "view_name text,"
- + "target_columns frozen<list<text>>,"
- + "clustering_columns frozen<list<text>>,"
- + "included_columns frozen<list<text>>,"
- + "PRIMARY KEY ((keyspace_name), table_name, view_name))");
+ + "base_table_id uuid,"
+ + "base_table_name text,"
+ + "bloom_filter_fp_chance double,"
+ + "caching frozen<map<text, text>>,"
+ + "comment text,"
+ + "compaction frozen<map<text, text>>,"
+ + "compression frozen<map<text, text>>,"
+ + "dclocal_read_repair_chance double,"
+ + "default_time_to_live int,"
+ + "extensions frozen<map<text, blob>>,"
+ + "gc_grace_seconds int,"
+ + "id uuid,"
+ + "include_all_columns boolean,"
+ + "max_index_interval int,"
+ + "memtable_flush_period_in_ms int,"
+ + "min_index_interval int,"
+ + "read_repair_chance double,"
+ + "speculative_retry text,"
+ + "PRIMARY KEY ((keyspace_name), view_name))");
private static final CFMetaData Indexes =
compile(INDEXES,
@@ -210,7 +224,7 @@ public final class SchemaKeyspace
+ "PRIMARY KEY ((keyspace_name), aggregate_name, signature))");
public static final List<CFMetaData> ALL_TABLE_METADATA =
- ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, MaterializedViews, Types, Functions, Aggregates, Indexes);
+ ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, Views, Types, Functions, Aggregates, Indexes);
private static CFMetaData compile(String name, String description, String schema)
{
@@ -265,9 +279,10 @@ public final class SchemaKeyspace
readSchemaPartitionForKeyspaceAndApply(TYPES, key,
types -> readSchemaPartitionForKeyspaceAndApply(TABLES, key,
- tables -> readSchemaPartitionForKeyspaceAndApply(FUNCTIONS, key,
+ tables -> readSchemaPartitionForKeyspaceAndApply(VIEWS, key,
+ views -> readSchemaPartitionForKeyspaceAndApply(FUNCTIONS, key,
functions -> readSchemaPartitionForKeyspaceAndApply(AGGREGATES, key,
- aggregates -> keyspaces.add(createKeyspaceFromSchemaPartitions(partition, tables, types, functions, aggregates)))))
+ aggregates -> keyspaces.add(createKeyspaceFromSchemaPartitions(partition, tables, views, types, functions, aggregates))))))
);
}
}
@@ -473,6 +488,7 @@ public final class SchemaKeyspace
// current state of the schema
Map<DecoratedKey, FilteredPartition> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
Map<DecoratedKey, FilteredPartition> oldColumnFamilies = readSchemaForKeyspaces(TABLES, keyspaces);
+ Map<DecoratedKey, FilteredPartition> oldViews = readSchemaForKeyspaces(VIEWS, keyspaces);
Map<DecoratedKey, FilteredPartition> oldTypes = readSchemaForKeyspaces(TYPES, keyspaces);
Map<DecoratedKey, FilteredPartition> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
Map<DecoratedKey, FilteredPartition> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
@@ -485,12 +501,14 @@ public final class SchemaKeyspace
// with new data applied
Map<DecoratedKey, FilteredPartition> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces);
Map<DecoratedKey, FilteredPartition> newColumnFamilies = readSchemaForKeyspaces(TABLES, keyspaces);
+ Map<DecoratedKey, FilteredPartition> newViews = readSchemaForKeyspaces(VIEWS, keyspaces);
Map<DecoratedKey, FilteredPartition> newTypes = readSchemaForKeyspaces(TYPES, keyspaces);
Map<DecoratedKey, FilteredPartition> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces);
Map<DecoratedKey, FilteredPartition> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces);
Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces);
mergeTables(oldColumnFamilies, newColumnFamilies);
+ mergeViews(oldViews, newViews);
mergeTypes(oldTypes, newTypes);
mergeFunctions(oldFunctions, newFunctions);
mergeAggregates(oldAggregates, newAggregates);
@@ -546,6 +564,27 @@ public final class SchemaKeyspace
});
}
+ private static void mergeViews(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
+ {
+ diffSchema(before, after, new Differ()
+ {
+ public void onDropped(UntypedResultSet.Row oldRow)
+ {
+ Schema.instance.dropView(oldRow.getString("keyspace_name"), oldRow.getString("view_name"));
+ }
+
+ public void onAdded(UntypedResultSet.Row newRow)
+ {
+ Schema.instance.addView(createViewFromViewRow(newRow));
+ }
+
+ public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow)
+ {
+ Schema.instance.updateView(newRow.getString("keyspace_name"), newRow.getString("view_name"));
+ }
+ });
+ }
+
private static void mergeTypes(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after)
{
diffSchema(before, after, new Differ()
@@ -697,6 +736,7 @@ public final class SchemaKeyspace
Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
keyspace.tables.forEach(table -> addTableToSchemaMutation(table, timestamp, true, mutation));
+ keyspace.views.forEach(view -> addViewToSchemaMutation(view, timestamp, true, mutation));
keyspace.types.forEach(type -> addTypeToSchemaMutation(type, timestamp, mutation));
keyspace.functions.udfs().forEach(udf -> addFunctionToSchemaMutation(udf, timestamp, mutation));
keyspace.functions.udas().forEach(uda -> addAggregateToSchemaMutation(uda, timestamp, mutation));
@@ -717,6 +757,7 @@ public final class SchemaKeyspace
private static KeyspaceMetadata createKeyspaceFromSchemaPartitions(RowIterator serializedParams,
RowIterator serializedTables,
+ RowIterator serializedViews,
RowIterator serializedTypes,
RowIterator serializedFunctions,
RowIterator serializedAggregates)
@@ -725,13 +766,14 @@ public final class SchemaKeyspace
KeyspaceParams params = createKeyspaceParamsFromSchemaPartition(serializedParams);
Tables tables = createTablesFromTablesPartition(serializedTables);
+ Views views = createViewsFromViewsPartition(serializedViews);
Types types = createTypesFromPartition(serializedTypes);
Collection<UDFunction> udfs = createFunctionsFromFunctionsPartition(serializedFunctions);
Collection<UDAggregate> udas = createAggregatesFromAggregatesPartition(serializedAggregates);
Functions functions = org.apache.cassandra.schema.Functions.builder().add(udfs).add(udas).build();
- return KeyspaceMetadata.create(name, params, tables, types, functions);
+ return KeyspaceMetadata.create(name, params, tables, views, types, functions);
}
/**
@@ -849,9 +891,6 @@ public final class SchemaKeyspace
for (TriggerMetadata trigger : table.getTriggers())
addTriggerToSchemaMutation(table, trigger, timestamp, mutation);
- for (MaterializedViewDefinition materializedView: table.getMaterializedViews())
- addMaterializedViewToSchemaMutation(table, materializedView, timestamp, mutation);
-
for (IndexMetadata index : table.getIndexes())
addIndexToSchemaMutation(table, index, timestamp, mutation);
}
@@ -931,22 +970,6 @@ public final class SchemaKeyspace
for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnRight().values())
addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation);
- MapDifference<String, MaterializedViewDefinition> materializedViewDiff = materializedViewsDiff(oldTable.getMaterializedViews(), newTable.getMaterializedViews());
-
- // dropped materialized views
- for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnLeft().values())
- dropMaterializedViewFromSchemaMutation(oldTable, materializedView, timestamp, mutation);
-
- // newly created materialized views
- for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnRight().values())
- addMaterializedViewToSchemaMutation(newTable, materializedView, timestamp, mutation);
-
- // updated materialized views need to be updated
- for (MapDifference.ValueDifference<MaterializedViewDefinition> diff : materializedViewDiff.entriesDiffering().values())
- {
- addUpdatedMaterializedViewDefinitionToSchemaMutation(newTable, diff.rightValue(), timestamp, mutation);
- }
-
MapDifference<String, IndexMetadata> indexesDiff = indexesDiff(oldTable.getIndexes(),
newTable.getIndexes());
@@ -989,17 +1012,6 @@ public final class SchemaKeyspace
return Maps.difference(beforeMap, afterMap);
}
- private static MapDifference<String, MaterializedViewDefinition> materializedViewsDiff(MaterializedViews before, MaterializedViews after)
- {
- Map<String, MaterializedViewDefinition> beforeMap = new HashMap<>();
- before.forEach(v -> beforeMap.put(v.viewName, v));
-
- Map<String, MaterializedViewDefinition> afterMap = new HashMap<>();
- after.forEach(v -> afterMap.put(v.viewName, v));
-
- return Maps.difference(beforeMap, afterMap);
- }
-
public static Mutation makeDropTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp)
{
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
@@ -1013,9 +1025,6 @@ public final class SchemaKeyspace
for (TriggerMetadata trigger : table.getTriggers())
dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation);
- for (MaterializedViewDefinition materializedView : table.getMaterializedViews())
- dropMaterializedViewFromSchemaMutation(table, materializedView, timestamp, mutation);
-
for (IndexMetadata index : table.getIndexes())
dropIndexFromSchemaMutation(table, index, timestamp, mutation);
@@ -1083,12 +1092,8 @@ public final class SchemaKeyspace
Triggers triggers =
readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, SchemaKeyspace::createTriggersFromTriggersPartition);
- MaterializedViews views =
- readSchemaPartitionForTableAndApply(MATERIALIZED_VIEWS, keyspace, table, SchemaKeyspace::createMaterializedViewsFromMaterializedViewsPartition);
-
CFMetaData cfm = createTableFromTableRowAndColumns(row, columns).droppedColumns(droppedColumns)
- .triggers(triggers)
- .materializedViews(views);
+ .triggers(triggers);
// the CFMetaData itself is required to build the collection of indexes as
// the column definitions are needed because we store only the name each
@@ -1114,7 +1119,6 @@ public final class SchemaKeyspace
boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
- boolean isMaterializedView = flags.contains(CFMetaData.Flag.VIEW);
return CFMetaData.create(keyspace,
table,
@@ -1123,7 +1127,7 @@ public final class SchemaKeyspace
isCompound,
isSuper,
isCounter,
- isMaterializedView,
+ false,
columns,
DatabaseDescriptor.getPartitioner())
.params(createTableParamsFromRow(row));
@@ -1274,82 +1278,164 @@ public final class SchemaKeyspace
}
/*
- * Materialized View metadata serialization/deserialization.
+ * View metadata serialization/deserialization.
*/
- private static void addMaterializedViewToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
+ public static Mutation makeCreateViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp)
+ {
+ // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+ Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+ addViewToSchemaMutation(view, timestamp, true, mutation);
+ return mutation;
+ }
+
+ private static void addViewToSchemaMutation(ViewDefinition view, long timestamp, boolean includeColumns, Mutation mutation)
{
- RowUpdateBuilder builder = new RowUpdateBuilder(MaterializedViews, timestamp, mutation)
- .clustering(table.cfName, materializedView.viewName);
+ RowUpdateBuilder builder = new RowUpdateBuilder(Views, timestamp, mutation)
+ .clustering(view.viewName);
+
+ CFMetaData table = view.metadata;
- builder.frozenList("target_columns", materializedView.partitionColumns.stream().map(ColumnIdentifier::toString).collect(Collectors.toList()));
- builder.frozenList("clustering_columns", materializedView.clusteringColumns.stream().map(ColumnIdentifier::toString).collect(Collectors.toList()));
- builder.frozenList("included_columns", materializedView.included.stream().map(ColumnIdentifier::toString).collect(Collectors.toList()));
+ builder.add("include_all_columns", view.includeAllColumns)
+ .add("base_table_id", view.baseTableId)
+ .add("base_table_name", view.baseTableMetadata().cfName)
+ .add("id", table.cfId);
+
+ addTableParamsToSchemaMutation(table.params, builder);
+
+ if (includeColumns)
+ {
+ for (ColumnDefinition column : table.allColumns())
+ addColumnToSchemaMutation(table, column, timestamp, mutation);
+
+ for (CFMetaData.DroppedColumn column : table.getDroppedColumns().values())
+ addDroppedColumnToSchemaMutation(table, column, timestamp, mutation);
+ }
builder.build();
}
- private static void dropMaterializedViewFromSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
+ public static Mutation makeDropViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp)
{
- RowUpdateBuilder.deleteRow(MaterializedViews, timestamp, mutation, table.cfName, materializedView.viewName);
+ // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
+ Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+
+ RowUpdateBuilder.deleteRow(Views, timestamp, mutation, view.viewName);
+
+ CFMetaData table = view.metadata;
+ for (ColumnDefinition column : table.allColumns())
+ dropColumnFromSchemaMutation(table, column, timestamp, mutation);
+
+ for (IndexMetadata index : table.getIndexes())
+ dropIndexFromSchemaMutation(table, index, timestamp, mutation);
+
+ return mutation;
}
- private static void addUpdatedMaterializedViewDefinitionToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
+ public static Mutation makeUpdateViewMutation(KeyspaceMetadata keyspace,
+ ViewDefinition oldView,
+ ViewDefinition newView,
+ long timestamp)
{
- addMaterializedViewToSchemaMutation(table, materializedView, timestamp, mutation);
+ Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
+
+ addViewToSchemaMutation(newView, timestamp, false, mutation);
+
+ MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(oldView.metadata.getColumnMetadata(),
+ newView.metadata.getColumnMetadata());
+
+ // columns that are no longer needed
+ for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values())
+ {
+ dropColumnFromSchemaMutation(oldView.metadata, column, timestamp, mutation);
+ }
+
+ // newly added columns
+ for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values())
+ addColumnToSchemaMutation(newView.metadata, column, timestamp, mutation);
+
+ // old columns with updated attributes
+ for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
+ addColumnToSchemaMutation(newView.metadata, newView.metadata.getColumnDefinition(name), timestamp, mutation);
+
+ // dropped columns
+ MapDifference<ByteBuffer, CFMetaData.DroppedColumn> droppedColumnDiff =
+ Maps.difference(oldView.metadata.getDroppedColumns(), oldView.metadata.getDroppedColumns());
+
+ // newly dropped columns
+ for (CFMetaData.DroppedColumn column : droppedColumnDiff.entriesOnlyOnRight().values())
+ addDroppedColumnToSchemaMutation(oldView.metadata, column, timestamp, mutation);
+
+ // columns added then dropped again
+ for (ByteBuffer name : droppedColumnDiff.entriesDiffering().keySet())
+ addDroppedColumnToSchemaMutation(newView.metadata, newView.metadata.getDroppedColumns().get(name), timestamp, mutation);
+
+ return mutation;
+ }
+
+ public static ViewDefinition createViewFromName(String keyspace, String view)
+ {
+ return readSchemaPartitionForTableAndApply(VIEWS, keyspace, view, partition ->
+ {
+ if (partition.isEmpty())
+ throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, view));
+
+ return createViewFromViewPartition(partition);
+ });
+ }
+
+ private static ViewDefinition createViewFromViewPartition(RowIterator partition)
+ {
+ String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS);
+ return createViewFromViewRow(QueryProcessor.resultify(query, partition).one());
}
/**
- * Deserialize materialized views from storage-level representation.
+ * Deserialize views from storage-level representation.
*
- * @param partition storage-level partition containing the materialized view definitions
- * @return the list of processed MaterializedViewDefinitions
+ * @param partition storage-level partition containing the view definitions
+ * @return the list of processed ViewDefinitions
*/
- private static MaterializedViews createMaterializedViewsFromMaterializedViewsPartition(RowIterator partition)
+ private static Views createViewsFromViewsPartition(RowIterator partition)
{
- MaterializedViews.Builder views = org.apache.cassandra.schema.MaterializedViews.builder();
- String query = String.format("SELECT * FROM %s.%s", NAME, MATERIALIZED_VIEWS);
+ Views.Builder views = org.apache.cassandra.schema.Views.builder();
+ String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS);
for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition))
{
- MaterializedViewDefinition mv = createMaterializedViewFromMaterializedViewRow(row);
- views.add(mv);
+ ViewDefinition view = createViewFromViewRow(row);
+ views.add(view);
}
return views.build();
}
- private static MaterializedViewDefinition createMaterializedViewFromMaterializedViewRow(UntypedResultSet.Row row)
+ private static ViewDefinition createViewFromViewRow(UntypedResultSet.Row row)
{
- String name = row.getString("view_name");
- List<String> partitionColumnNames = row.getFrozenList("target_columns", UTF8Type.instance);
-
- String cfName = row.getString("table_name");
- List<String> clusteringColumnNames = row.getFrozenList("clustering_columns", UTF8Type.instance);
-
- List<ColumnIdentifier> partitionColumns = new ArrayList<>();
- for (String columnName : partitionColumnNames)
- {
- partitionColumns.add(ColumnIdentifier.getInterned(columnName, true));
- }
-
- List<ColumnIdentifier> clusteringColumns = new ArrayList<>();
- for (String columnName : clusteringColumnNames)
- {
- clusteringColumns.add(ColumnIdentifier.getInterned(columnName, true));
- }
+ String keyspace = row.getString("keyspace_name");
+ String view = row.getString("view_name");
+ UUID id = row.getUUID("id");
+ UUID baseTableId = row.getUUID("base_table_id");
+ boolean includeAll = row.getBoolean("include_all_columns");
- List<String> includedColumnNames = row.getFrozenList("included_columns", UTF8Type.instance);
- Set<ColumnIdentifier> includedColumns = new HashSet<>();
- if (includedColumnNames != null)
- {
- for (String columnName : includedColumnNames)
- includedColumns.add(ColumnIdentifier.getInterned(columnName, true));
- }
+ List<ColumnDefinition> columns =
+ readSchemaPartitionForTableAndApply(COLUMNS, keyspace, view, SchemaKeyspace::createColumnsFromColumnsPartition);
- return new MaterializedViewDefinition(cfName,
- name,
- partitionColumns,
- clusteringColumns,
- includedColumns);
+ Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns =
+ readSchemaPartitionForTableAndApply(DROPPED_COLUMNS, keyspace, view, SchemaKeyspace::createDroppedColumnsFromDroppedColumnsPartition);
+
+ CFMetaData cfm = CFMetaData.create(keyspace,
+ view,
+ id,
+ false,
+ true,
+ false,
+ false,
+ true,
+ columns,
+ DatabaseDescriptor.getPartitioner())
+ .params(createTableParamsFromRow(row))
+ .droppedColumns(droppedColumns);
+
+ return new ViewDefinition(keyspace, view, baseTableId, includeAll, cfm);
}
/*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/schema/Views.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Views.java b/src/java/org/apache/cassandra/schema/Views.java
new file mode 100644
index 0000000..5888b9d
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/Views.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+
+import java.util.Iterator;
+import java.util.Optional;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ViewDefinition;
+
+import static com.google.common.collect.Iterables.filter;
+
+public final class Views implements Iterable<ViewDefinition>
+{
+ private final ImmutableMap<String, ViewDefinition> views;
+
+ private Views(Builder builder)
+ {
+ views = builder.views.build();
+ }
+
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ public static Views none()
+ {
+ return builder().build();
+ }
+
+ public Iterator<ViewDefinition> iterator()
+ {
+ return views.values().iterator();
+ }
+
+ public Iterable<CFMetaData> metadatas()
+ {
+ return Iterables.transform(views.values(), view -> view.metadata);
+ }
+
+ public int size()
+ {
+ return views.size();
+ }
+
+ public boolean isEmpty()
+ {
+ return views.isEmpty();
+ }
+
+ /**
+ * Get the materialized view with the specified name
+ *
+ * @param name a non-qualified materialized view name
+ * @return an empty {@link Optional} if the materialized view name is not found; a non-empty optional of {@link ViewDefinition} otherwise
+ */
+ public Optional<ViewDefinition> get(String name)
+ {
+ return Optional.ofNullable(views.get(name));
+ }
+
+ /**
+ * Get the view with the specified name
+ *
+ * @param name a non-qualified view name
+ * @return null if the view name is not found; the found {@link ViewDefinition} otherwise
+ */
+ @Nullable
+ public ViewDefinition getNullable(String name)
+ {
+ return views.get(name);
+ }
+
+ /**
+ * Create a MaterializedViews instance with the provided materialized view added
+ */
+ public Views with(ViewDefinition view)
+ {
+ if (get(view.viewName).isPresent())
+ throw new IllegalStateException(String.format("Materialized View %s already exists", view.viewName));
+
+ return builder().add(this).add(view).build();
+ }
+
+ /**
+ * Creates a MaterializedViews instance with the materializedView with the provided name removed
+ */
+ public Views without(String name)
+ {
+ ViewDefinition materializedView =
+ get(name).orElseThrow(() -> new IllegalStateException(String.format("Materialized View %s doesn't exists", name)));
+
+ return builder().add(filter(this, v -> v != materializedView)).build();
+ }
+
+ /**
+ * Creates a MaterializedViews instance which contains an updated materialized view
+ */
+ public Views replace(ViewDefinition view, CFMetaData cfm)
+ {
+ return without(view.viewName).with(view);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return this == o || (o instanceof Views && views.equals(((Views) o).views));
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return views.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return views.values().toString();
+ }
+
+ public static final class Builder
+ {
+ final ImmutableMap.Builder<String, ViewDefinition> views = new ImmutableMap.Builder<>();
+
+ private Builder()
+ {
+ }
+
+ public Views build()
+ {
+ return new Views(this);
+ }
+
+
+ public Builder add(ViewDefinition view)
+ {
+ views.put(view.viewName, view);
+ return this;
+ }
+
+ public Builder add(Iterable<ViewDefinition> views)
+ {
+ views.forEach(this::add);
+ return this;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 230b46a..1408a70 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -230,7 +230,7 @@ public class CassandraDaemon
if (keyspaceName.equals(SystemKeyspace.NAME))
continue;
- for (CFMetaData cfm : Schema.instance.getTables(keyspaceName))
+ for (CFMetaData cfm : Schema.instance.getTablesAndViews(keyspaceName))
ColumnFamilyStore.scrubDataDirectories(cfm);
}
@@ -295,22 +295,19 @@ public class CassandraDaemon
}
}
- Runnable indexRebuild = new Runnable()
+ Runnable viewRebuild = new Runnable()
{
@Override
public void run()
{
for (Keyspace keyspace : Keyspace.all())
{
- for (ColumnFamilyStore cf: keyspace.getColumnFamilyStores())
- {
- cf.materializedViewManager.buildAllViews();
- }
+ keyspace.viewManager.buildAllViews();
}
}
};
- ScheduledExecutors.optionalTasks.schedule(indexRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
+ ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
SystemKeyspace.finishStartup();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/service/MigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationListener.java b/src/java/org/apache/cassandra/service/MigrationListener.java
index 358b236..f4b3e7c 100644
--- a/src/java/org/apache/cassandra/service/MigrationListener.java
+++ b/src/java/org/apache/cassandra/service/MigrationListener.java
@@ -31,6 +31,11 @@ public abstract class MigrationListener
{
}
+ public void onCreateView(String ksName, String viewName)
+ {
+ onCreateColumnFamily(ksName, viewName);
+ }
+
public void onCreateUserType(String ksName, String typeName)
{
}
@@ -51,6 +56,11 @@ public abstract class MigrationListener
{
}
+ public void onUpdateView(String ksName, String viewName, boolean columnsDidChange)
+ {
+ onUpdateColumnFamily(ksName, viewName, columnsDidChange);
+ }
+
public void onUpdateUserType(String ksName, String typeName)
{
}
@@ -71,6 +81,10 @@ public abstract class MigrationListener
{
}
+ public void onDropView(String ksName, String viewName)
+ {
+ }
+
public void onDropUserType(String ksName, String typeName)
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index dad6aa7..c820f18 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.ViewDefinition;
import org.apache.cassandra.cql3.functions.UDAggregate;
import org.apache.cassandra.cql3.functions.UDFunction;
import org.apache.cassandra.db.*;
@@ -162,6 +163,12 @@ public class MigrationManager
listener.onCreateColumnFamily(cfm.ksName, cfm.cfName);
}
+ public void notifyCreateView(ViewDefinition view)
+ {
+ for (MigrationListener listener : listeners)
+ listener.onCreateView(view.ksName, view.viewName);
+ }
+
public void notifyCreateUserType(UserType ut)
{
for (MigrationListener listener : listeners)
@@ -192,6 +199,12 @@ public class MigrationManager
listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName, columnsDidChange);
}
+ public void notifyUpdateView(ViewDefinition view, boolean columnsDidChange)
+ {
+ for (MigrationListener listener : listeners)
+ listener.onUpdateView(view.ksName, view.viewName, columnsDidChange);
+ }
+
public void notifyUpdateUserType(UserType ut)
{
for (MigrationListener listener : listeners)
@@ -225,6 +238,12 @@ public class MigrationManager
listener.onDropColumnFamily(cfm.ksName, cfm.cfName);
}
+ public void notifyDropView(ViewDefinition view)
+ {
+ for (MigrationListener listener : listeners)
+ listener.onDropView(view.ksName, view.viewName);
+ }
+
public void notifyDropUserType(UserType ut)
{
for (MigrationListener listener : listeners)
@@ -276,13 +295,28 @@ public class MigrationManager
KeyspaceMetadata ksm = Schema.instance.getKSMetaData(cfm.ksName);
if (ksm == null)
throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", cfm.cfName, cfm.ksName));
- else if (ksm.tables.get(cfm.cfName).isPresent())
+ // If we have a table or a view which has the same name, we can't add a new one
+ else if (ksm.getTableOrViewNullable(cfm.cfName) != null)
throw new AlreadyExistsException(cfm.ksName, cfm.cfName);
logger.info(String.format("Create new table: %s", cfm));
announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, FBUtilities.timestampMicros()), announceLocally);
}
+ public static void announceNewView(ViewDefinition view, boolean announceLocally) throws ConfigurationException
+ {
+ view.metadata.validate();
+
+ KeyspaceMetadata ksm = Schema.instance.getKSMetaData(view.ksName);
+ if (ksm == null)
+ throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", view.viewName, view.ksName));
+ else if (ksm.getTableOrViewNullable(view.viewName) != null)
+ throw new AlreadyExistsException(view.ksName, view.viewName);
+
+ logger.info(String.format("Create new view: %s", view));
+ announce(SchemaKeyspace.makeCreateViewMutation(ksm, view, FBUtilities.timestampMicros()), announceLocally);
+ }
+
public static void announceNewType(UserType newType, boolean announceLocally)
{
KeyspaceMetadata ksm = Schema.instance.getKSMetaData(newType.keyspace);
@@ -340,6 +374,21 @@ public class MigrationManager
announce(SchemaKeyspace.makeUpdateTableMutation(ksm, oldCfm, cfm, FBUtilities.timestampMicros(), fromThrift), announceLocally);
}
+ public static void announceViewUpdate(ViewDefinition view, boolean announceLocally) throws ConfigurationException
+ {
+ view.metadata.validate();
+
+ ViewDefinition oldView = Schema.instance.getView(view.ksName, view.viewName);
+ if (oldView == null)
+ throw new ConfigurationException(String.format("Cannot update non existing materialized view '%s' in keyspace '%s'.", view.viewName, view.ksName));
+ KeyspaceMetadata ksm = Schema.instance.getKSMetaData(view.ksName);
+
+ oldView.metadata.validateCompatility(view.metadata);
+
+ logger.info(String.format("Update view '%s/%s' From %s To %s", view.ksName, view.viewName, oldView, view));
+ announce(SchemaKeyspace.makeUpdateViewMutation(ksm, oldView, view, FBUtilities.timestampMicros()), announceLocally);
+ }
+
public static void announceTypeUpdate(UserType updatedType, boolean announceLocally)
{
announceNewType(updatedType, announceLocally);
@@ -376,6 +425,17 @@ public class MigrationManager
announce(SchemaKeyspace.makeDropTableMutation(ksm, oldCfm, FBUtilities.timestampMicros()), announceLocally);
}
+ public static void announceViewDrop(String ksName, String viewName, boolean announceLocally) throws ConfigurationException
+ {
+ ViewDefinition view = Schema.instance.getView(ksName, viewName);
+ if (view == null)
+ throw new ConfigurationException(String.format("Cannot drop non existing materialized view '%s' in keyspace '%s'.", viewName, ksName));
+ KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName);
+
+ logger.info(String.format("Drop table '%s/%s'", view.ksName, view.viewName));
+ announce(SchemaKeyspace.makeDropViewMutation(ksm, view, FBUtilities.timestampMicros()), announceLocally);
+ }
+
public static void announceTypeDrop(UserType droppedType)
{
announceTypeDrop(droppedType, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/service/StartupChecks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java
index eca0c7e..16ff488 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -270,7 +270,7 @@ public class StartupChecks
// we do a one-off scrub of the system keyspace first; we can't load the list of the rest of the keyspaces,
// until system keyspace is opened.
- for (CFMetaData cfm : Schema.instance.getTables(SystemKeyspace.NAME))
+ for (CFMetaData cfm : Schema.instance.getTablesAndViews(SystemKeyspace.NAME))
ColumnFamilyStore.scrubDataDirectories(cfm);
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e3b884e..d209af6 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -48,12 +48,9 @@ import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.db.view.MaterializedViewManager;
-import org.apache.cassandra.db.view.MaterializedViewUtils;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.RingPosition;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.db.view.ViewManager;
+import org.apache.cassandra.db.view.ViewUtils;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
@@ -98,7 +95,7 @@ public class StorageProxy implements StorageProxyMBean
private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write");
private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite");
private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead");
- private static final MVWriteMetrics mvWriteMetrics = new MVWriteMetrics("MVWrite");
+ private static final ViewWriteMetrics viewWriteMetrics = new ViewWriteMetrics("ViewWrite");
private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
@@ -655,6 +652,7 @@ public class StorageProxy implements StorageProxyMBean
* @param mutations the mutations to be applied across the replicas
*/
public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog)
+ throws UnavailableException, OverloadedException, WriteTimeoutException
{
Tracing.trace("Determining replicas for mutation");
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -679,15 +677,15 @@ public class StorageProxy implements StorageProxyMBean
{
String keyspaceName = mutation.getKeyspaceName();
Token tk = mutation.key().getToken();
- InetAddress pairedEndpoint = MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
+ InetAddress pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint);
- WriteResponseHandlerWrapper wrapper = wrapMVBatchResponseHandler(mutation,
- consistencyLevel,
- consistencyLevel,
- naturalEndpoints,
- WriteType.BATCH,
- cleanup);
+ WriteResponseHandlerWrapper wrapper = wrapViewBatchResponseHandler(mutation,
+ consistencyLevel,
+ consistencyLevel,
+ naturalEndpoints,
+ WriteType.BATCH,
+ cleanup);
// When local node is the endpoint and there are no pending nodes we can
// Just apply the mutation locally.
@@ -704,12 +702,12 @@ public class StorageProxy implements StorageProxyMBean
writeCommitLog);
// now actually perform the writes and wait for them to complete
- asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
+ asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION);
}
}
finally
{
- mvWriteMetrics.addNano(System.nanoTime() - startTime);
+ viewWriteMetrics.addNano(System.nanoTime() - startTime);
}
}
@@ -721,7 +719,9 @@ public class StorageProxy implements StorageProxyMBean
{
Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations);
- boolean updatesView = MaterializedViewManager.updatesAffectView(mutations, true);
+ boolean updatesView = Keyspace.open(mutations.iterator().next().getKeyspaceName())
+ .viewManager
+ .updatesAffectView(mutations, true);
if (augmented != null)
mutateAtomically(augmented, consistencyLevel, updatesView);
@@ -974,14 +974,14 @@ public class StorageProxy implements StorageProxyMBean
/**
* Same as performWrites except does not initiate writes (but does perform availability checks).
- * Keeps track of MVWriteMetrics
+ * Keeps track of ViewWriteMetrics
*/
- private static WriteResponseHandlerWrapper wrapMVBatchResponseHandler(Mutation mutation,
- ConsistencyLevel consistency_level,
- ConsistencyLevel batchConsistencyLevel,
- List<InetAddress> naturalEndpoints,
- WriteType writeType,
- BatchlogResponseHandler.BatchlogCleanup cleanup)
+ private static WriteResponseHandlerWrapper wrapViewBatchResponseHandler(Mutation mutation,
+ ConsistencyLevel consistency_level,
+ ConsistencyLevel batchConsistencyLevel,
+ List<InetAddress> naturalEndpoints,
+ WriteType writeType,
+ BatchlogResponseHandler.BatchlogCleanup cleanup)
{
Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
@@ -989,7 +989,7 @@ public class StorageProxy implements StorageProxyMBean
Token tk = mutation.key().getToken();
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
- BatchlogResponseHandler<IMutation> batchHandler = new MVWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
+ BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
return new WriteResponseHandlerWrapper(batchHandler, mutation);
}
@@ -2282,20 +2282,20 @@ public class StorageProxy implements StorageProxyMBean
}
/**
- * This class captures metrics for materialized views writes.
+ * This class captures metrics for views writes.
*/
- private static class MVWriteMetricsWrapped extends BatchlogResponseHandler<IMutation>
+ private static class ViewWriteMetricsWrapped extends BatchlogResponseHandler<IMutation>
{
- public MVWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup)
+ public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup)
{
super(writeHandler, i, cleanup);
- mvWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
+ viewWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
}
public void response(MessageIn<IMutation> msg)
{
super.response(msg);
- mvWriteMetrics.viewReplicasSuccess.inc();
+ viewWriteMetrics.viewReplicasSuccess.inc();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f095630..f0ad46f 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -630,12 +630,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void runMayThrow() throws InterruptedException
{
inShutdownHook = true;
- ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
+ ExecutorService viewMutationStage = StageManager.getStage(Stage.VIEW_MUTATION);
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
if (mutationStage.isShutdown()
&& counterMutationStage.isShutdown()
- && materializedViewMutationStage.isShutdown())
+ && viewMutationStage.isShutdown())
return; // drained already
if (daemon != null)
@@ -646,11 +646,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// In-progress writes originating here could generate hints to be written, so shut down MessagingService
// before mutation stage, so we can get all the hints saved before shutting down
MessagingService.instance().shutdown();
- materializedViewMutationStage.shutdown();
+ viewMutationStage.shutdown();
HintsService.instance.pauseDispatch();
counterMutationStage.shutdown();
mutationStage.shutdown();
- materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
+ viewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
StorageProxy.instance.verifyNoHintsInProgress();
@@ -3179,7 +3179,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (ksMetaData == null)
throw new IllegalArgumentException("Unknown keyspace '" + keyspaceName + "'");
- CFMetaData cfMetaData = ksMetaData.tables.getNullable(cf);
+ CFMetaData cfMetaData = ksMetaData.getTableOrViewNullable(cf);
if (cfMetaData == null)
throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'");
@@ -3876,11 +3876,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
HintsService.instance.pauseDispatch();
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
- ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
+ ExecutorService viewMutationStage = StageManager.getStage(Stage.VIEW_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
if (mutationStage.isTerminated()
&& counterMutationStage.isTerminated()
- && materializedViewMutationStage.isTerminated())
+ && viewMutationStage.isTerminated())
{
logger.warn("Cannot drain node (did it already happen?)");
return;
@@ -3894,10 +3894,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().shutdown();
setMode(Mode.DRAINING, "clearing mutation stage", false);
- materializedViewMutationStage.shutdown();
+ viewMutationStage.shutdown();
counterMutationStage.shutdown();
mutationStage.shutdown();
- materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
+ viewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index cb99654..f261954 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -21,6 +21,7 @@ import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,10 +34,10 @@ import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.view.View;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -127,7 +128,7 @@ public class StreamReceiveTask extends StreamTask
return;
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- boolean hasMaterializedViews = cfs.materializedViewManager.allViews().iterator().hasNext();
+ boolean hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
try
{
@@ -143,11 +144,11 @@ public class StreamReceiveTask extends StreamTask
try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- //We have a special path for Materialized view.
- //Since the MV requires cleaning up any pre-existing state, we must put
+ //We have a special path for views.
+ //Since the view requires cleaning up any pre-existing state, we must put
//all partitions through the same write path as normal mutations.
- //This also ensures any 2i's are also updated
- if (hasMaterializedViews)
+ //This also ensures any 2is are also updated
+ if (hasViews)
{
for (SSTableReader reader : readers)
{
@@ -183,7 +184,7 @@ public class StreamReceiveTask extends StreamTask
{
//We don't keep the streamed sstables since we've applied them manually
//So we abort the txn and delete the streamed sstables
- if (hasMaterializedViews)
+ if (hasViews)
{
cfs.forceBlockingFlush();
task.txn.abort();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 207bb6f..7017bc1 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.*;
@@ -815,7 +816,7 @@ public class CassandraServer implements Cassandra.Iface
cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.MODIFY);
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, false);
- if (metadata.isMaterializedView())
+ if (metadata.isView())
throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot modify Materialized Views directly");
ThriftValidation.validateKey(metadata, key);
@@ -912,7 +913,7 @@ public class CassandraServer implements Cassandra.Iface
cState.hasColumnFamilyAccess(keyspace, column_family, Permission.SELECT);
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_family, false);
- if (metadata.isMaterializedView())
+ if (metadata.isView())
throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot modify Materialized Views directly");
ThriftValidation.validateKey(metadata, key);
@@ -1108,7 +1109,7 @@ public class CassandraServer implements Cassandra.Iface
cState.hasColumnFamilyAccess(keyspace, cfName, Permission.MODIFY);
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, cfName);
- if (metadata.isMaterializedView())
+ if (metadata.isView())
throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot modify Materialized Views directly");
ThriftValidation.validateKey(metadata, key);
@@ -1325,7 +1326,7 @@ public class CassandraServer implements Cassandra.Iface
cState.hasColumnFamilyAccess(keyspace, column_path.column_family, Permission.MODIFY);
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_path.column_family, isCommutativeOp);
- if (metadata.isMaterializedView())
+ if (metadata.isView())
throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot modify Materialized Views directly");
ThriftValidation.validateKey(metadata, key);
@@ -1903,7 +1904,7 @@ public class CassandraServer implements Cassandra.Iface
cState.hasColumnFamilyAccess(keyspace, column_family, Permission.DROP);
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_family);
- if (metadata.isMaterializedView())
+ if (metadata.isView())
throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot drop Materialized Views from Thrift");
MigrationManager.announceColumnFamilyDrop(keyspace, column_family);
@@ -2015,9 +2016,9 @@ public class CassandraServer implements Cassandra.Iface
if (oldCfm == null)
throw new InvalidRequestException("Could not find table definition to modify.");
- if (oldCfm.isMaterializedView())
+ if (oldCfm.isView())
throw new InvalidRequestException("Cannot modify Materialized View table " + oldCfm.cfName + " as it may break the schema. You should use cqlsh to modify Materialized View tables instead.");
- if (!oldCfm.getMaterializedViews().isEmpty())
+ if (!Iterables.isEmpty(View.findAll(cf_def.keyspace, cf_def.name)))
throw new InvalidRequestException("Cannot modify table with Materialized View " + oldCfm.cfName + " as it may break the schema. You should use cqlsh to modify tables with Materialized Views instead.");
if (!oldCfm.isThriftCompatible())
@@ -2047,7 +2048,7 @@ public class CassandraServer implements Cassandra.Iface
String keyspace = cState.getKeyspace();
cState.hasColumnFamilyAccess(keyspace, cfname, Permission.MODIFY);
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, cfname, false);
- if (metadata.isMaterializedView())
+ if (metadata.isView())
throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot truncate Materialized Views");
if (startSessionIfRequested())
@@ -2134,7 +2135,7 @@ public class CassandraServer implements Cassandra.Iface
cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.MODIFY);
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, true);
- if (metadata.isMaterializedView())
+ if (metadata.isView())
throw new org.apache.cassandra.exceptions.InvalidRequestException("Cannot modify Materialized Views directly");
ThriftValidation.validateKey(metadata, key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 86cfe42..b721226 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -162,7 +162,7 @@ public class ThriftConversion
public static KsDef toThrift(KeyspaceMetadata ksm)
{
List<CfDef> cfDefs = new ArrayList<>();
- for (CFMetaData cfm : ksm.tables)
+ for (CFMetaData cfm : ksm.tables) // do not include views
if (cfm.isThriftCompatible()) // Don't expose CF that cannot be correctly handle by thrift; see CASSANDRA-4377 for further details
cfDefs.add(toThrift(cfm));
@@ -272,8 +272,8 @@ public class ThriftConversion
defaultValidator);
}
- // We do not allow Thrift materialized views, so we always set it to false
- boolean isMaterializedView = false;
+ // We do not allow Thrift views, so we always set it to false
+ boolean isView = false;
CFMetaData newCFMD = CFMetaData.create(cf_def.keyspace,
cf_def.name,
@@ -282,7 +282,7 @@ public class ThriftConversion
isCompound,
isSuper,
isCounter,
- isMaterializedView,
+ isView,
defs,
DatabaseDescriptor.getPartitioner());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index d602076..8dff532 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -80,6 +80,8 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
}
tables.putAll(fetchTablesMetadata(keyspace, session, partitioner));
+ // We only need the CFMetaData for the views, so we only load that.
+ tables.putAll(fetchViewMetadata(keyspace, session, partitioner));
}
}
@@ -111,41 +113,61 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
for (Row row : session.execute(query, keyspace))
{
String name = row.getString("table_name");
- UUID id = row.getUUID("id");
-
- Set<CFMetaData.Flag> flags = row.isNull("flags")
- ? Collections.emptySet()
- : CFMetaData.flagsFromStrings(row.getSet("flags", String.class));
-
- boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
- boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
- boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
- boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
- boolean isMaterializedView = flags.contains(CFMetaData.Flag.VIEW);
-
- String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?",
- SchemaKeyspace.NAME,
- SchemaKeyspace.COLUMNS);
-
- List<ColumnDefinition> defs = new ArrayList<>();
- for (Row colRow : session.execute(columnsQuery, keyspace, name))
- defs.add(createDefinitionFromRow(colRow, keyspace, name));
-
- tables.put(name, CFMetaData.create(keyspace,
- name,
- id,
- isDense,
- isCompound,
- isSuper,
- isCounter,
- isMaterializedView,
- defs,
- partitioner));
+ tables.put(name, createTableMetadata(keyspace, session, partitioner, false, row, name));
}
return tables;
}
+ /*
+ * In the case where we are creating View CFMetaDatas, we
+ */
+ private static Map<String, CFMetaData> fetchViewMetadata(String keyspace, Session session, IPartitioner partitioner)
+ {
+ Map<String, CFMetaData> tables = new HashMap<>();
+ String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ?", SchemaKeyspace.NAME, SchemaKeyspace.VIEWS);
+
+ for (Row row : session.execute(query, keyspace))
+ {
+ String name = row.getString("view_name");
+ tables.put(name, createTableMetadata(keyspace, session, partitioner, true, row, name));
+ }
+
+ return tables;
+ }
+
+ private static CFMetaData createTableMetadata(String keyspace, Session session, IPartitioner partitioner, boolean isView, Row row, String name)
+ {
+ UUID id = row.getUUID("id");
+ Set<CFMetaData.Flag> flags = row.isNull("flags")
+ ? Collections.emptySet()
+ : CFMetaData.flagsFromStrings(row.getSet("flags", String.class));
+
+ boolean isSuper = flags.contains(CFMetaData.Flag.SUPER);
+ boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
+ boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
+ boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
+
+ String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?",
+ SchemaKeyspace.NAME,
+ SchemaKeyspace.COLUMNS);
+
+ List<ColumnDefinition> defs = new ArrayList<>();
+ for (Row colRow : session.execute(columnsQuery, keyspace, name))
+ defs.add(createDefinitionFromRow(colRow, keyspace, name));
+
+ return CFMetaData.create(keyspace,
+ name,
+ id,
+ isDense,
+ isCompound,
+ isSuper,
+ isCounter,
+ isView,
+ defs,
+ partitioner);
+ }
+
private static ColumnDefinition createDefinitionFromRow(Row row, String keyspace, String table)
{
ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3a8dbca/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java b/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
deleted file mode 100644
index b833e60..0000000
--- a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.cql3;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CyclicBarrier;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import com.datastax.driver.core.exceptions.WriteTimeoutException;
-import org.apache.cassandra.concurrent.SEPExecutor;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.batchlog.BatchlogManager;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public class MaterializedViewLongTest extends CQLTester
-{
- int protocolVersion = 4;
- private final List<String> materializedViews = new ArrayList<>();
-
- @BeforeClass
- public static void startup()
- {
- requireNetwork();
- }
- @Before
- public void begin()
- {
- materializedViews.clear();
- }
-
- @After
- public void end() throws Throwable
- {
- for (String viewName : materializedViews)
- executeNet(protocolVersion, "DROP MATERIALIZED VIEW " + viewName);
- }
-
- private void createView(String name, String query) throws Throwable
- {
- executeNet(protocolVersion, String.format(query, name));
- // If exception is thrown, the view will not be added to the list; since it shouldn't have been created, this is
- // the desired behavior
- materializedViews.add(name);
- }
-
- @Test
- public void testConflictResolution() throws Throwable
- {
- final int writers = 96;
- final int insertsPerWriter = 50;
- final Map<Integer, Exception> failedWrites = new ConcurrentHashMap<>();
-
- createTable("CREATE TABLE %s (" +
- "a int," +
- "b int," +
- "c int," +
- "PRIMARY KEY (a, b))");
-
- executeNet(protocolVersion, "USE " + keyspace());
-
- createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE c IS NOT NULL AND a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (c, a, b)");
-
- CyclicBarrier semaphore = new CyclicBarrier(writers);
-
- Thread[] threads = new Thread[writers];
- for (int i = 0; i < writers; i++)
- {
- final int writer = i;
- Thread t = new Thread(new WrappedRunnable()
- {
- public void runMayThrow()
- {
- try
- {
- int writerOffset = writer * insertsPerWriter;
- semaphore.await();
- for (int i = 0; i < insertsPerWriter; i++)
- {
- try
- {
- executeNet(protocolVersion, "INSERT INTO %s (a, b, c) VALUES (?, ?, ?) USING TIMESTAMP 1",
- 1,
- 1,
- i + writerOffset);
- }
- catch (NoHostAvailableException|WriteTimeoutException e)
- {
- failedWrites.put(i + writerOffset, e);
- }
- }
- }
- catch (Throwable e)
- {
- throw new RuntimeException(e);
- }
- }
- });
- t.start();
- threads[i] = t;
- }
-
- for (int i = 0; i < writers; i++)
- threads[i].join();
-
- for (int i = 0; i < writers * insertsPerWriter; i++)
- {
- if (executeNet(protocolVersion, "SELECT COUNT(*) FROM system.batchlog").one().getLong(0) == 0)
- break;
- try
- {
- // This will throw exceptions whenever there are exceptions trying to push the materialized view values
- // out, caused by the view becoming overwhelmed.
- BatchlogManager.instance.startBatchlogReplay().get();
- }
- catch (Throwable ignore)
- {
-
- }
- }
-
- int value = executeNet(protocolVersion, "SELECT c FROM %s WHERE a = 1 AND b = 1").one().getInt("c");
-
- List<Row> rows = executeNet(protocolVersion, "SELECT c FROM " + keyspace() + ".mv").all();
-
- boolean containsC = false;
- StringBuilder others = new StringBuilder();
- StringBuilder overlappingFailedWrites = new StringBuilder();
- for (Row row : rows)
- {
- int c = row.getInt("c");
- if (c == value)
- containsC = true;
- else
- {
- if (others.length() != 0)
- others.append(' ');
- others.append(c);
- if (failedWrites.containsKey(c))
- {
- if (overlappingFailedWrites.length() != 0)
- overlappingFailedWrites.append(' ');
- overlappingFailedWrites.append(c)
- .append(':')
- .append(failedWrites.get(c).getMessage());
- }
- }
- }
-
- if (rows.size() > 1)
- {
- throw new AssertionError(String.format("Expected 1 row, but found %d; %s c = %d, and (%s) of which (%s) failed to insert", rows.size(), containsC ? "found row with" : "no rows contained", value, others, overlappingFailedWrites));
- }
- else if (rows.isEmpty())
- {
- throw new AssertionError(String.format("Could not find row with c = %d", value));
- }
- else if (rows.size() == 1 && !containsC)
- {
- throw new AssertionError(String.format("Single row had c = %d, expected %d", rows.get(0).getInt("c"), value));
- }
- }
-}