You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2018/05/18 15:46:20 UTC
[2/4] cassandra git commit: Implement virtual keyspace interface
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
new file mode 100644
index 0000000..6750215
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Collection;
+
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Tables;
+
+public class VirtualKeyspace
+{
+ private final String name;
+ private final KeyspaceMetadata metadata;
+
+ private final ImmutableCollection<VirtualTable> tables;
+
+ public VirtualKeyspace(String name, Collection<VirtualTable> tables)
+ {
+ this.name = name;
+ this.tables = ImmutableList.copyOf(tables);
+
+ metadata = KeyspaceMetadata.virtual(name, Tables.of(Iterables.transform(tables, VirtualTable::metadata)));
+ }
+
+ public String name()
+ {
+ return name;
+ }
+
+ public KeyspaceMetadata metadata()
+ {
+ return metadata;
+ }
+
+ public ImmutableCollection<VirtualTable> tables()
+ {
+ return tables;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
new file mode 100644
index 0000000..5e0f90c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+
+public final class VirtualKeyspaceRegistry
+{
+ public static final VirtualKeyspaceRegistry instance = new VirtualKeyspaceRegistry();
+
+ private final Map<String, VirtualKeyspace> virtualKeyspaces = new ConcurrentHashMap<>();
+ private final Map<TableId, VirtualTable> virtualTables = new ConcurrentHashMap<>();
+
+ private VirtualKeyspaceRegistry()
+ {
+ }
+
+ public void register(VirtualKeyspace keyspace)
+ {
+ virtualKeyspaces.put(keyspace.name(), keyspace);
+ keyspace.tables().forEach(t -> virtualTables.put(t.metadata().id, t));
+ }
+
+ @Nullable
+ public VirtualKeyspace getKeyspaceNullable(String name)
+ {
+ return virtualKeyspaces.get(name);
+ }
+
+ @Nullable
+ public VirtualTable getTableNullable(TableId id)
+ {
+ return virtualTables.get(id);
+ }
+
+ @Nullable
+ public KeyspaceMetadata getKeyspaceMetadataNullable(String name)
+ {
+ VirtualKeyspace keyspace = virtualKeyspaces.get(name);
+ return null != keyspace ? keyspace.metadata() : null;
+ }
+
+ @Nullable
+ public TableMetadata getTableMetadataNullable(TableId id)
+ {
+ VirtualTable table = virtualTables.get(id);
+ return null != table ? table.metadata() : null;
+ }
+
+ public Iterable<KeyspaceMetadata> virtualKeyspacesMetadata()
+ {
+ return Iterables.transform(virtualKeyspaces.values(), VirtualKeyspace::metadata);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
new file mode 100644
index 0000000..dc32c8c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Collection;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.TableId;
+
+/**
+ * A specialised IMutation implementation for virtual keyspaces.
+ *
+ * Mainly overrides {@link #apply()} to go straight to {@link VirtualTable#apply(PartitionUpdate)} for every table involved.
+ */
+public final class VirtualMutation implements IMutation
+{
+ private final String keyspaceName;
+ private final DecoratedKey partitionKey;
+ private final ImmutableMap<TableId, PartitionUpdate> modifications;
+
+ public VirtualMutation(PartitionUpdate update)
+ {
+ this(update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update));
+ }
+
+ public VirtualMutation(String keyspaceName, DecoratedKey partitionKey, ImmutableMap<TableId, PartitionUpdate> modifications)
+ {
+ this.keyspaceName = keyspaceName;
+ this.partitionKey = partitionKey;
+ this.modifications = modifications;
+ }
+
+ @Override
+ public void apply()
+ {
+ modifications.forEach((id, update) -> VirtualKeyspaceRegistry.instance.getTableNullable(id).apply(update));
+ }
+
+ @Override
+ public String getKeyspaceName()
+ {
+ return keyspaceName;
+ }
+
+ @Override
+ public Collection<TableId> getTableIds()
+ {
+ return modifications.keySet();
+ }
+
+ @Override
+ public DecoratedKey key()
+ {
+ return partitionKey;
+ }
+
+ @Override
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getWriteRpcTimeout();
+ }
+
+ @Override
+ public String toString(boolean shallow)
+ {
+ MoreObjects.ToStringHelper helper =
+ MoreObjects.toStringHelper(this)
+ .add("keyspace", keyspaceName)
+ .add("partition key", partitionKey);
+
+ if (shallow)
+ helper.add("tables", getTableIds());
+ else
+ helper.add("modifications", getPartitionUpdates());
+
+ return helper.toString();
+ }
+
+ @Override
+ public Collection<PartitionUpdate> getPartitionUpdates()
+ {
+ return modifications.values();
+ }
+
+ @Override
+ public void validateIndexedColumns()
+ {
+ // no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java b/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java
new file mode 100644
index 0000000..299cc00
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static org.apache.cassandra.schema.TableMetadata.builder;
+
+public final class VirtualSchemaKeyspace extends VirtualKeyspace
+{
+ private static final String NAME = "system_virtual_schema";
+
+ public static final VirtualSchemaKeyspace instance = new VirtualSchemaKeyspace();
+
+ private VirtualSchemaKeyspace()
+ {
+ super(NAME, ImmutableList.of(new VirtualKeyspaces(NAME), new VirtualTables(NAME), new VirtualColumns(NAME)));
+ }
+
+ private static final class VirtualKeyspaces extends AbstractVirtualTable
+ {
+ private static final String KEYSPACE_NAME = "keyspace_name";
+
+ private VirtualKeyspaces(String keyspace)
+ {
+ super(builder(keyspace, "keyspaces")
+ .comment("virtual keyspace definitions")
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance)
+ .build());
+ }
+
+ public DataSet data()
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata());
+ for (KeyspaceMetadata keyspace : VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata())
+ result.row(keyspace.name);
+ return result;
+ }
+ }
+
+ private static final class VirtualTables extends AbstractVirtualTable
+ {
+ private static final String KEYSPACE_NAME = "keyspace_name";
+ private static final String TABLE_NAME = "table_name";
+ private static final String COMMENT = "comment";
+
+ private VirtualTables(String keyspace)
+ {
+ super(builder(keyspace, "tables")
+ .comment("virtual table definitions")
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance)
+ .addClusteringColumn(TABLE_NAME, UTF8Type.instance)
+ .addRegularColumn(COMMENT, UTF8Type.instance)
+ .build());
+ }
+
+ public DataSet data()
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata());
+
+ for (KeyspaceMetadata keyspace : VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata())
+ {
+ for (TableMetadata table : keyspace.tables)
+ {
+ result.row(table.keyspace, table.name)
+ .column(COMMENT, table.params.comment);
+ }
+ }
+
+ return result;
+ }
+ }
+
+ private static final class VirtualColumns extends AbstractVirtualTable
+ {
+ private static final String KEYSPACE_NAME = "keyspace_name";
+ private static final String TABLE_NAME = "table_name";
+ private static final String COLUMN_NAME = "column_name";
+ private static final String CLUSTERING_ORDER = "clustering_order";
+ private static final String COLUMN_NAME_BYTES = "column_name_bytes";
+ private static final String KIND = "kind";
+ private static final String POSITION = "position";
+ private static final String TYPE = "type";
+
+ private VirtualColumns(String keyspace)
+ {
+ super(builder(keyspace, "columns")
+ .comment("virtual column definitions")
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance)
+ .addClusteringColumn(TABLE_NAME, UTF8Type.instance)
+ .addClusteringColumn(COLUMN_NAME, UTF8Type.instance)
+ .addRegularColumn(CLUSTERING_ORDER, UTF8Type.instance)
+ .addRegularColumn(COLUMN_NAME_BYTES, BytesType.instance)
+ .addRegularColumn(KIND, UTF8Type.instance)
+ .addRegularColumn(POSITION, Int32Type.instance)
+ .addRegularColumn(TYPE, UTF8Type.instance)
+ .build());
+ }
+
+ public DataSet data()
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata());
+
+ for (KeyspaceMetadata keyspace : VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata())
+ {
+ for (TableMetadata table : keyspace.tables)
+ {
+ for (ColumnMetadata column : table.columns())
+ {
+ result.row(column.ksName, column.cfName, column.name.toString())
+ .column(CLUSTERING_ORDER, column.clusteringOrder().toString().toLowerCase())
+ .column(COLUMN_NAME_BYTES, column.name.bytes)
+ .column(KIND, column.kind.toString().toLowerCase())
+ .column(POSITION, column.position())
+ .column(TYPE, column.type.asCQL3Type().toString());
+ }
+ }
+ }
+
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
new file mode 100644
index 0000000..ea196ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * A system view used to expose system information.
+ */
+public interface VirtualTable
+{
+ /**
+ * Returns the view name.
+ *
+ * @return the view name.
+ */
+ default String name()
+ {
+ return metadata().name;
+ }
+
+ /**
+ * Returns the view metadata.
+ *
+ * @return the view metadata.
+ */
+ TableMetadata metadata();
+
+ /**
+ * Applies the specified update.
+ * @param update the update to apply
+ */
+ void apply(PartitionUpdate update);
+
+ /**
+ * Selects the rows from a single partition.
+ *
+ * @param partitionKey the partition key
+ * @param clusteringIndexFilter the clustering columns to selected
+ * @param columnFilter the selected columns
+ * @return the rows corresponding to the requested data.
+ */
+ UnfilteredPartitionIterator select(DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter);
+
+ /**
+ * Selects the rows from a range of partitions.
+ *
+ * @param dataRange the range of data to retrieve
+ * @param columnFilter the selected columns
+ * @return the rows corresponding to the requested data.
+ */
+ UnfilteredPartitionIterator select(DataRange dataRange, ColumnFilter columnFilter);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/index/IndexRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java b/src/java/org/apache/cassandra/index/IndexRegistry.java
index 9f5ed02..e4c531b 100644
--- a/src/java/org/apache/cassandra/index/IndexRegistry.java
+++ b/src/java/org/apache/cassandra/index/IndexRegistry.java
@@ -21,8 +21,14 @@
package org.apache.cassandra.index;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.TableMetadata;
/**
* The collection of all Index instances for a base table.
@@ -34,9 +40,72 @@ import org.apache.cassandra.schema.IndexMetadata;
*/
public interface IndexRegistry
{
+ /**
+ * An empty {@code IndexRegistry}
+ */
+ public static final IndexRegistry EMPTY = new IndexRegistry()
+ {
+ @Override
+ public void unregisterIndex(Index index)
+ {
+ }
+
+ @Override
+ public void registerIndex(Index index)
+ {
+ }
+
+ @Override
+ public Collection<Index> listIndexes()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Index getIndex(IndexMetadata indexMetadata)
+ {
+ return null;
+ }
+
+ @Override
+ public Optional<Index> getBestIndexFor(RowFilter.Expression expression)
+ {
+ return Optional.empty();
+ }
+
+ @Override
+ public void validate(PartitionUpdate update)
+ {
+ }
+ };
+
void registerIndex(Index index);
void unregisterIndex(Index index);
Index getIndex(IndexMetadata indexMetadata);
Collection<Index> listIndexes();
+
+ Optional<Index> getBestIndexFor(RowFilter.Expression expression);
+
+ /**
+ * Called at write time to ensure that values present in the update
+ * are valid according to the rules of all registered indexes which
+ * will process it. The partition key as well as the clustering and
+ * cell values for each row in the update may be checked by index
+ * implementations
+ *
+ * @param update PartitionUpdate containing the values to be validated by registered Index implementations
+ */
+ void validate(PartitionUpdate update);
+
+ /**
+ * Returns the {@code IndexRegistry} associated to the specified table.
+ *
+ * @param table the table metadata
+ * @return the {@code IndexRegistry} associated to the specified table
+ */
+ public static IndexRegistry obtain(TableMetadata table)
+ {
+ return table.isVirtual() ? EMPTY : Keyspace.openAndGetStore(table).indexManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 9a29c02..fb0d629 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -739,6 +739,7 @@ public abstract class CassandraIndex implements Index
TableMetadata.Builder builder =
TableMetadata.builder(baseCfsMetadata.keyspace, baseCfsMetadata.indexTableName(indexMetadata), baseCfsMetadata.id)
+ .kind(TableMetadata.Kind.INDEX)
// tables for legacy KEYS indexes are non-compound and dense
.isDense(indexMetadata.isKeys())
.isCompound(!indexMetadata.isKeys())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index 80a3869..5a72d2c 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@ -36,16 +36,23 @@ import static java.lang.String.format;
*/
public final class KeyspaceMetadata
{
+ public enum Kind
+ {
+ REGULAR, VIRTUAL
+ }
+
public final String name;
+ public final Kind kind;
public final KeyspaceParams params;
public final Tables tables;
public final Views views;
public final Types types;
public final Functions functions;
- private KeyspaceMetadata(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
+ private KeyspaceMetadata(String name, Kind kind, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
{
this.name = name;
+ this.kind = kind;
this.params = params;
this.tables = tables;
this.views = views;
@@ -55,42 +62,52 @@ public final class KeyspaceMetadata
public static KeyspaceMetadata create(String name, KeyspaceParams params)
{
- return new KeyspaceMetadata(name, params, Tables.none(), Views.none(), Types.none(), Functions.none());
+ return new KeyspaceMetadata(name, Kind.REGULAR, params, Tables.none(), Views.none(), Types.none(), Functions.none());
}
public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables)
{
- return new KeyspaceMetadata(name, params, tables, Views.none(), Types.none(), Functions.none());
+ return new KeyspaceMetadata(name, Kind.REGULAR, params, tables, Views.none(), Types.none(), Functions.none());
}
public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions)
{
- return new KeyspaceMetadata(name, params, tables, views, types, functions);
+ return new KeyspaceMetadata(name, Kind.REGULAR, params, tables, views, types, functions);
+ }
+
+ public static KeyspaceMetadata virtual(String name, Tables tables)
+ {
+ return new KeyspaceMetadata(name, Kind.VIRTUAL, KeyspaceParams.local(), tables, Views.none(), Types.none(), Functions.none());
}
public KeyspaceMetadata withSwapped(KeyspaceParams params)
{
- return new KeyspaceMetadata(name, params, tables, views, types, functions);
+ return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
}
public KeyspaceMetadata withSwapped(Tables regular)
{
- return new KeyspaceMetadata(name, params, regular, views, types, functions);
+ return new KeyspaceMetadata(name, kind, params, regular, views, types, functions);
}
public KeyspaceMetadata withSwapped(Views views)
{
- return new KeyspaceMetadata(name, params, tables, views, types, functions);
+ return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
}
public KeyspaceMetadata withSwapped(Types types)
{
- return new KeyspaceMetadata(name, params, tables, views, types, functions);
+ return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
}
public KeyspaceMetadata withSwapped(Functions functions)
{
- return new KeyspaceMetadata(name, params, tables, views, types, functions);
+ return new KeyspaceMetadata(name, kind, params, tables, views, types, functions);
+ }
+
+ public boolean isVirtual()
+ {
+ return kind == Kind.VIRTUAL;
}
public Iterable<TableMetadata> tablesAndViews()
@@ -129,7 +146,7 @@ public final class KeyspaceMetadata
@Override
public int hashCode()
{
- return Objects.hashCode(name, params, tables, views, functions, types);
+ return Objects.hashCode(name, kind, params, tables, views, functions, types);
}
@Override
@@ -144,6 +161,7 @@ public final class KeyspaceMetadata
KeyspaceMetadata other = (KeyspaceMetadata) o;
return name.equals(other.name)
+ && kind == other.kind
&& params.equals(other.params)
&& tables.equals(other.tables)
&& views.equals(other.views)
@@ -156,6 +174,7 @@ public final class KeyspaceMetadata
{
return MoreObjects.toStringHelper(this)
.add("name", name)
+ .add("kind", kind)
.add("params", params)
.add("tables", tables)
.add("views", views)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index 594b2ab..09ec62a 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.MapDifference;
@@ -28,15 +29,12 @@ import com.google.common.collect.Sets;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.KeyspaceNotDefinedException;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.UnknownTableException;
@@ -312,7 +310,8 @@ public final class Schema
public KeyspaceMetadata getKeyspaceMetadata(String keyspaceName)
{
assert keyspaceName != null;
- return keyspaces.getNullable(keyspaceName);
+ KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName);
+ return null != keyspace ? keyspace : VirtualKeyspaceRegistry.instance.getKeyspaceMetadataNullable(keyspaceName);
}
private Set<String> getNonSystemKeyspacesSet()
@@ -426,15 +425,17 @@ public final class Schema
assert keyspace != null;
assert table != null;
- KeyspaceMetadata ksm = keyspaces.getNullable(keyspace);
+ KeyspaceMetadata ksm = getKeyspaceMetadata(keyspace);
return ksm == null
? null
: ksm.getTableOrViewNullable(table);
}
+ @Nullable
public TableMetadata getTableMetadata(TableId id)
{
- return keyspaces.getTableOrViewNullable(id);
+ TableMetadata table = keyspaces.getTableOrViewNullable(id);
+ return null != table ? table : VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id);
}
public TableMetadata validateTable(String keyspaceName, String tableName)
@@ -442,7 +443,7 @@ public final class Schema
if (tableName.isEmpty())
throw new InvalidRequestException("non-empty table is required");
- KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName);
+ KeyspaceMetadata keyspace = getKeyspaceMetadata(keyspaceName);
if (keyspace == null)
throw new KeyspaceNotDefinedException(format("keyspace %s does not exist", keyspaceName));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 638e912..4945fc2 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -1141,7 +1141,7 @@ public final class SchemaKeyspace
TableMetadata metadata =
TableMetadata.builder(keyspaceName, viewName, TableId.fromUUID(row.getUUID("id")))
- .isView(true)
+ .kind(TableMetadata.Kind.VIEW)
.addColumns(columns)
.droppedColumns(fetchDroppedColumns(keyspaceName, viewName))
.params(createTableParamsFromRow(row))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/TableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java
index 4634438..47e5b47 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.Objects;
+import javax.annotation.Nullable;
+
import com.google.common.base.MoreObjects;
import com.google.common.collect.*;
@@ -82,15 +84,21 @@ public final class TableMetadata
}
}
+ public enum Kind
+ {
+ REGULAR, INDEX, VIEW, VIRTUAL
+ }
+
public final String keyspace;
public final String name;
public final TableId id;
public final IPartitioner partitioner;
+ public final Kind kind;
public final TableParams params;
public final ImmutableSet<Flag> flags;
- private final boolean isView;
+ @Nullable
private final String indexName; // derived from table name
/*
@@ -139,12 +147,10 @@ public final class TableMetadata
id = builder.id;
partitioner = builder.partitioner;
+ kind = builder.kind;
params = builder.params.build();
- isView = builder.isView;
- indexName = name.contains(".")
- ? name.substring(name.indexOf('.') + 1)
- : null;
+ indexName = kind == Kind.INDEX ? name.substring(name.indexOf('.') + 1) : null;
droppedColumns = ImmutableMap.copyOf(builder.droppedColumns);
Collections.sort(builder.partitionKeyColumns);
@@ -184,23 +190,28 @@ public final class TableMetadata
{
return builder(keyspace, name, id)
.partitioner(partitioner)
+ .kind(kind)
.params(params)
.flags(flags)
- .isView(isView)
.addColumns(columns())
.droppedColumns(droppedColumns)
.indexes(indexes)
.triggers(triggers);
}
+ public boolean isIndex()
+ {
+ return kind == Kind.INDEX;
+ }
+
public boolean isView()
{
- return isView;
+ return kind == Kind.VIEW;
}
- public boolean isIndex()
+ public boolean isVirtual()
{
- return indexName != null;
+ return kind == Kind.VIRTUAL;
}
public Optional<String> indexName()
@@ -534,7 +545,7 @@ public final class TableMetadata
private void except(String format, Object... args)
{
- throw new ConfigurationException(keyspace + "." + name + ": " +format(format, args));
+ throw new ConfigurationException(keyspace + "." + name + ": " + format(format, args));
}
@Override
@@ -552,9 +563,9 @@ public final class TableMetadata
&& name.equals(tm.name)
&& id.equals(tm.id)
&& partitioner.equals(tm.partitioner)
+ && kind == tm.kind
&& params.equals(tm.params)
&& flags.equals(tm.flags)
- && isView == tm.isView
&& columns.equals(tm.columns)
&& droppedColumns.equals(tm.droppedColumns)
&& indexes.equals(tm.indexes)
@@ -564,7 +575,7 @@ public final class TableMetadata
@Override
public int hashCode()
{
- return Objects.hash(keyspace, name, id, partitioner, params, flags, isView, columns, droppedColumns, indexes, triggers);
+ return Objects.hash(keyspace, name, id, partitioner, kind, params, flags, columns, droppedColumns, indexes, triggers);
}
@Override
@@ -580,9 +591,9 @@ public final class TableMetadata
.add("table", name)
.add("id", id)
.add("partitioner", partitioner)
+ .add("kind", kind)
.add("params", params)
.add("flags", flags)
- .add("isView", isView)
.add("columns", columns())
.add("droppedColumns", droppedColumns.values())
.add("indexes", indexes)
@@ -598,6 +609,7 @@ public final class TableMetadata
private TableId id;
private IPartitioner partitioner;
+ private Kind kind = Kind.REGULAR;
private TableParams.Builder params = TableParams.builder();
// Setting compound as default as "normal" CQL tables are compound and that's what we want by default
@@ -611,8 +623,6 @@ public final class TableMetadata
private final List<ColumnMetadata> clusteringColumns = new ArrayList<>();
private final List<ColumnMetadata> regularAndStaticColumns = new ArrayList<>();
- private boolean isView;
-
private Builder(String keyspace, String name, TableId id)
{
this.keyspace = keyspace;
@@ -649,6 +659,12 @@ public final class TableMetadata
return this;
}
+ public Builder kind(Kind val)
+ {
+ kind = val;
+ return this;
+ }
+
public Builder params(TableParams val)
{
params = val.unbuild();
@@ -733,12 +749,6 @@ public final class TableMetadata
return this;
}
- public Builder isView(boolean val)
- {
- isView = val;
- return this;
- }
-
public Builder flags(Set<Flag> val)
{
flags = val;
@@ -979,6 +989,6 @@ public final class TableMetadata
*/
public boolean enforceStrictLiveness()
{
- return isView && Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness();
+ return isView() && Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/CASRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java b/src/java/org/apache/cassandra/service/CASRequest.java
index 1db100d..88fb9bd 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.service;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadQuery;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -30,7 +30,7 @@ public interface CASRequest
/**
* The command to use to fetch the value to compare for the CAS.
*/
- public SinglePartitionReadCommand readCommand(int nowInSec);
+ public SinglePartitionReadQuery readCommand(int nowInSec);
/**
* Returns whether the provided CF, that represents the values fetched using the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 6e0b92b..815e673 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -46,6 +46,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.StartupClusterConnectivityChecker;
import org.apache.cassandra.schema.TableMetadata;
@@ -249,6 +252,8 @@ public class CassandraDaemon
throw e;
}
+ VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
+ VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
// clean up debris in the rest of the keyspaces
for (String keyspaceName : Schema.instance.getKeyspaces())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index c854737..234ac4f 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.*;
+import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -65,8 +66,12 @@ public class ClientState
for (String cf : Arrays.asList(SystemKeyspace.LOCAL, SystemKeyspace.LEGACY_PEERS, SystemKeyspace.PEERS_V2))
READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SYSTEM_KEYSPACE_NAME, cf));
+ // make all schema tables readable by default (required by the drivers)
SchemaKeyspace.ALL.forEach(table -> READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SCHEMA_KEYSPACE_NAME, table)));
+ // make all virtual schema tables readable by default as well
+ VirtualSchemaKeyspace.instance.tables().forEach(t -> READABLE_SYSTEM_RESOURCES.add(t.metadata().resource));
+
// neither clients nor tools need authentication/authorization
if (DatabaseDescriptor.isDaemonInitialized())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 37bfd17..7e9b0f9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -259,7 +259,7 @@ public class StorageProxy implements StorageProxyMBean
// read the current values and check they validate the conditions
Tracing.trace("Reading existing values for CAS precondition");
- SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
+ SinglePartitionReadCommand readCommand = (SinglePartitionReadCommand) request.readCommand(FBUtilities.nowInSeconds());
ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
FilteredPartition current;
@@ -1633,7 +1633,7 @@ public class StorageProxy implements StorageProxyMBean
public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime)
throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
{
- if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.commands))
+ if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.queries))
{
readMetrics.unavailables.mark();
readMetricsMap.get(consistencyLevel).unavailables.mark();
@@ -1649,11 +1649,11 @@ public class StorageProxy implements StorageProxyMBean
throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
{
assert state != null;
- if (group.commands.size() > 1)
+ if (group.queries.size() > 1)
throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time");
long start = System.nanoTime();
- SinglePartitionReadCommand command = group.commands.get(0);
+ SinglePartitionReadCommand command = group.queries.get(0);
TableMetadata metadata = command.metadata();
DecoratedKey key = command.partitionKey();
@@ -1685,7 +1685,7 @@ public class StorageProxy implements StorageProxyMBean
throw new ReadFailureException(consistencyLevel, e.received, e.blockFor, false, e.failureReasonByEndpoint);
}
- result = fetchRows(group.commands, consistencyForCommitOrFetch, queryStartNanoTime);
+ result = fetchRows(group.queries, consistencyForCommitOrFetch, queryStartNanoTime);
}
catch (UnavailableException e)
{
@@ -1727,13 +1727,13 @@ public class StorageProxy implements StorageProxyMBean
long start = System.nanoTime();
try
{
- PartitionIterator result = fetchRows(group.commands, consistencyLevel, queryStartNanoTime);
+ PartitionIterator result = fetchRows(group.queries, consistencyLevel, queryStartNanoTime);
// Note that the only difference between the command in a group must be the partition key on which
// they applied.
- boolean enforceStrictLiveness = group.commands.get(0).metadata().enforceStrictLiveness();
+ boolean enforceStrictLiveness = group.queries.get(0).metadata().enforceStrictLiveness();
// If we have more than one command, then despite each read command honoring the limit, the total result
// might not honor it and so we should enforce it
- if (group.commands.size() > 1)
+ if (group.queries.size() > 1)
result = group.limits().filter(result, group.nowInSec(), group.selectsFullPartition(), enforceStrictLiveness);
return result;
}
@@ -1761,7 +1761,7 @@ public class StorageProxy implements StorageProxyMBean
readMetrics.addNano(latency);
readMetricsMap.get(consistencyLevel).addNano(latency);
// TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329
- for (ReadCommand command : group.commands)
+ for (ReadCommand command : group.queries)
Keyspace.openAndGetStore(command.metadata()).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 8570f10..4214644 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -69,6 +69,7 @@ import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.Verifier;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token.TokenFactory;
@@ -3456,12 +3457,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
- private Keyspace getValidKeyspace(String keyspaceName) throws IOException
+ private void verifyKeyspaceIsValid(String keyspaceName)
{
+ if (null != VirtualKeyspaceRegistry.instance.getKeyspaceNullable(keyspaceName))
+ throw new IllegalArgumentException("Cannot perform any operations against virtual keyspace " + keyspaceName);
+
if (!Schema.instance.getKeyspaces().contains(keyspaceName))
- {
- throw new IOException("Keyspace " + keyspaceName + " does not exist");
- }
+ throw new IllegalArgumentException("Keyspace " + keyspaceName + " does not exist");
+ }
+
+ private Keyspace getValidKeyspace(String keyspaceName)
+ {
+ verifyKeyspaceIsValid(keyspaceName);
return Keyspace.open(keyspaceName);
}
@@ -4787,6 +4794,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void truncate(String keyspace, String table) throws TimeoutException, IOException
{
+ verifyKeyspaceIsValid(keyspace);
+
try
{
StorageProxy.truncateBlocking(keyspace, table);
@@ -5249,6 +5258,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
if (!isInitialized())
throw new RuntimeException("Not yet initialized, can't load new sstables");
+ verifyKeyspaceIsValid(ksName);
ColumnFamilyStore.loadNewSSTables(ksName, cfName);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 8ebbdf7..da64a0c 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -26,9 +26,9 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.transport.ProtocolVersion;
-abstract class AbstractQueryPager implements QueryPager
+abstract class AbstractQueryPager<T extends ReadQuery> implements QueryPager
{
- protected final ReadCommand command;
+ protected final T query;
protected final DataLimits limits;
protected final ProtocolVersion protocolVersion;
private final boolean enforceStrictLiveness;
@@ -43,12 +43,12 @@ abstract class AbstractQueryPager implements QueryPager
private boolean exhausted;
- protected AbstractQueryPager(ReadCommand command, ProtocolVersion protocolVersion)
+ protected AbstractQueryPager(T query, ProtocolVersion protocolVersion)
{
- this.command = command;
+ this.query = query;
this.protocolVersion = protocolVersion;
- this.limits = command.limits();
- this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
+ this.limits = query.limits();
+ this.enforceStrictLiveness = query.metadata().enforceStrictLiveness();
this.remaining = limits.count();
this.remainingInPartition = limits.perPartitionCount();
@@ -56,7 +56,7 @@ abstract class AbstractQueryPager implements QueryPager
public ReadExecutionController executionController()
{
- return command.executionController();
+ return query.executionController();
}
public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime)
@@ -65,8 +65,8 @@ abstract class AbstractQueryPager implements QueryPager
return EmptyIterators.partition();
pageSize = Math.min(pageSize, remaining);
- Pager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
- return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState, queryStartNanoTime), pager);
+ Pager pager = new RowPager(limits.forPaging(pageSize), query.nowInSec());
+ return Transformation.apply(nextPageReadQuery(pageSize).execute(consistency, clientState, queryStartNanoTime), pager);
}
public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)
@@ -75,8 +75,8 @@ abstract class AbstractQueryPager implements QueryPager
return EmptyIterators.partition();
pageSize = Math.min(pageSize, remaining);
- RowPager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
- return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(executionController), pager);
+ RowPager pager = new RowPager(limits.forPaging(pageSize), query.nowInSec());
+ return Transformation.apply(nextPageReadQuery(pageSize).executeInternal(executionController), pager);
}
public UnfilteredPartitionIterator fetchPageUnfiltered(TableMetadata metadata, int pageSize, ReadExecutionController executionController)
@@ -85,9 +85,9 @@ abstract class AbstractQueryPager implements QueryPager
return EmptyIterators.unfilteredPartition(metadata);
pageSize = Math.min(pageSize, remaining);
- UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), command.nowInSec());
+ UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), query.nowInSec());
- return Transformation.apply(nextPageReadCommand(pageSize).executeLocally(executionController), pager);
+ return Transformation.apply(nextPageReadQuery(pageSize).executeLocally(executionController), pager);
}
private class UnfilteredPager extends Pager<Unfiltered>
@@ -128,7 +128,7 @@ abstract class AbstractQueryPager implements QueryPager
private Pager(DataLimits pageLimits, int nowInSec)
{
- this.counter = pageLimits.newCounter(nowInSec, true, command.selectsFullPartition(), enforceStrictLiveness);
+ this.counter = pageLimits.newCounter(nowInSec, true, query.selectsFullPartition(), enforceStrictLiveness);
this.pageLimits = pageLimits;
}
@@ -228,7 +228,7 @@ abstract class AbstractQueryPager implements QueryPager
return remainingInPartition;
}
- protected abstract ReadCommand nextPageReadCommand(int pageSize);
+ protected abstract T nextPageReadQuery(int pageSize);
protected abstract void recordLast(DecoratedKey key, Row row);
protected abstract boolean isPreviouslyReturnedPartition(DecoratedKey key);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 9dae11c..ca16967 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -31,20 +31,20 @@ import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.service.ClientState;
/**
- * Pager over a list of ReadCommand.
+ * Pager over a list of SinglePartitionReadQuery.
*
- * Note that this is not easy to make efficient. Indeed, we need to page the first command fully before
- * returning results from the next one, but if the result returned by each command is small (compared to pageSize),
- * paging the commands one at a time under-performs compared to parallelizing. On the other, if we parallelize
- * and each command raised pageSize results, we'll end up with commands.size() * pageSize results in memory, which
+ * Note that this is not easy to make efficient. Indeed, we need to page the first query fully before
+ * returning results from the next one, but if the result returned by each query is small (compared to pageSize),
+ * paging the queries one at a time under-performs compared to parallelizing. On the other, if we parallelize
+ * and each query raised pageSize results, we'll end up with queries.size() * pageSize results in memory, which
* defeats the purpose of paging.
*
- * For now, we keep it simple (somewhat) and just do one command at a time. Provided that we make sure to not
+ * For now, we keep it simple (somewhat) and just do one query at a time. Provided that we make sure to not
* create a pager unless we need to, this is probably fine. Though if we later want to get fancy, we could use the
- * cfs meanPartitionSize to decide if parallelizing some of the command might be worth it while being confident we don't
+ * cfs meanPartitionSize to decide if parallelizing some of the query might be worth it while being confident we don't
* blow out memory.
*/
-public class MultiPartitionPager implements QueryPager
+public class MultiPartitionPager<T extends SinglePartitionReadQuery> implements QueryPager
{
private final SinglePartitionPager[] pagers;
private final DataLimits limit;
@@ -54,33 +54,33 @@ public class MultiPartitionPager implements QueryPager
private int remaining;
private int current;
- public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state, ProtocolVersion protocolVersion)
+ public MultiPartitionPager(SinglePartitionReadQuery.Group<T> group, PagingState state, ProtocolVersion protocolVersion)
{
this.limit = group.limits();
this.nowInSec = group.nowInSec();
int i = 0;
- // If it's not the beginning (state != null), we need to find where we were and skip previous commands
+ // If it's not the beginning (state != null), we need to find where we were and skip previous queries
// since they are done.
if (state != null)
- for (; i < group.commands.size(); i++)
- if (group.commands.get(i).partitionKey().getKey().equals(state.partitionKey))
+ for (; i < group.queries.size(); i++)
+ if (group.queries.get(i).partitionKey().getKey().equals(state.partitionKey))
break;
- if (i >= group.commands.size())
+ if (i >= group.queries.size())
{
pagers = null;
return;
}
- pagers = new SinglePartitionPager[group.commands.size() - i];
+ pagers = new SinglePartitionPager[group.queries.size() - i];
// 'i' is on the first non exhausted pager for the previous page (or the first one)
- SinglePartitionReadCommand command = group.commands.get(i);
- pagers[0] = command.getPager(state, protocolVersion);
+ T query = group.queries.get(i);
+ pagers[0] = query.getPager(state, protocolVersion);
// Following ones haven't been started yet
- for (int j = i + 1; j < group.commands.size(); j++)
- pagers[j - i] = group.commands.get(j).getPager(null, protocolVersion);
+ for (int j = i + 1; j < group.queries.size(); j++)
+ pagers[j - i] = group.queries.get(j).getPager(null, protocolVersion);
remaining = state == null ? limit.count() : state.remaining;
}
@@ -103,11 +103,11 @@ public class MultiPartitionPager implements QueryPager
SinglePartitionPager[] newPagers = Arrays.copyOf(pagers, pagers.length);
newPagers[current] = newPagers[current].withUpdatedLimit(newLimits);
- return new MultiPartitionPager(newPagers,
- newLimits,
- nowInSec,
- remaining,
- current);
+ return new MultiPartitionPager<T>(newPagers,
+ newLimits,
+ nowInSec,
+ remaining,
+ current);
}
public PagingState state()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
index ba6862d..cebf3c6 100644
--- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
@@ -21,37 +21,36 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.transport.ProtocolVersion;
/**
- * Pages a PartitionRangeReadCommand.
+ * Pages a PartitionRangeReadQuery.
*/
-public class PartitionRangeQueryPager extends AbstractQueryPager
+public class PartitionRangeQueryPager extends AbstractQueryPager<PartitionRangeReadQuery>
{
private volatile DecoratedKey lastReturnedKey;
private volatile PagingState.RowMark lastReturnedRow;
- public PartitionRangeQueryPager(PartitionRangeReadCommand command, PagingState state, ProtocolVersion protocolVersion)
+ public PartitionRangeQueryPager(PartitionRangeReadQuery query, PagingState state, ProtocolVersion protocolVersion)
{
- super(command, protocolVersion);
+ super(query, protocolVersion);
if (state != null)
{
- lastReturnedKey = command.metadata().partitioner.decorateKey(state.partitionKey);
+ lastReturnedKey = query.metadata().partitioner.decorateKey(state.partitionKey);
lastReturnedRow = state.rowMark;
restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
}
}
- public PartitionRangeQueryPager(ReadCommand command,
+ public PartitionRangeQueryPager(PartitionRangeReadQuery query,
ProtocolVersion protocolVersion,
DecoratedKey lastReturnedKey,
PagingState.RowMark lastReturnedRow,
int remaining,
int remainingInPartition)
{
- super(command, protocolVersion);
+ super(query, protocolVersion);
this.lastReturnedKey = lastReturnedKey;
this.lastReturnedRow = lastReturnedRow;
restoreState(lastReturnedKey, remaining, remainingInPartition);
@@ -59,7 +58,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
public PartitionRangeQueryPager withUpdatedLimit(DataLimits newLimits)
{
- return new PartitionRangeQueryPager(command.withUpdatedLimit(newLimits),
+ return new PartitionRangeQueryPager(query.withUpdatedLimit(newLimits),
protocolVersion,
lastReturnedKey,
lastReturnedRow,
@@ -74,16 +73,16 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
: new PagingState(lastReturnedKey.getKey(), lastReturnedRow, maxRemaining(), remainingInPartition());
}
- protected ReadCommand nextPageReadCommand(int pageSize)
- throws RequestExecutionException
+ @Override
+ protected PartitionRangeReadQuery nextPageReadQuery(int pageSize)
{
DataLimits limits;
- DataRange fullRange = ((PartitionRangeReadCommand)command).dataRange();
+ DataRange fullRange = query.dataRange();
DataRange pageRange;
if (lastReturnedKey == null)
{
pageRange = fullRange;
- limits = command.limits().forPaging(pageSize);
+ limits = query.limits().forPaging(pageSize);
}
else
{
@@ -92,17 +91,17 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey, includeLastKey);
if (includeLastKey)
{
- pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedRow.clustering(command.metadata()), false);
- limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition());
+ pageRange = fullRange.forPaging(bounds, query.metadata().comparator, lastReturnedRow.clustering(query.metadata()), false);
+ limits = query.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition());
}
else
{
pageRange = fullRange.forSubRange(bounds);
- limits = command.limits().forPaging(pageSize);
+ limits = query.limits().forPaging(pageSize);
}
}
- return ((PartitionRangeReadCommand) command).withUpdatedLimitsAndDataRange(limits, pageRange);
+ return query.withUpdatedLimitsAndDataRange(limits, pageRange);
}
protected void recordLast(DecoratedKey key, Row last)
@@ -111,7 +110,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
{
lastReturnedKey = key;
if (last.clustering() != Clustering.STATIC_CLUSTERING)
- lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
+ lastReturnedRow = PagingState.RowMark.create(query.metadata(), last, protocolVersion);
}
}
@@ -123,18 +122,16 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
private AbstractBounds<PartitionPosition> makeKeyBounds(PartitionPosition lastReturnedKey, boolean includeLastKey)
{
- AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange();
+ AbstractBounds<PartitionPosition> bounds = query.dataRange().keyRange();
if (bounds instanceof Range || bounds instanceof Bounds)
{
return includeLastKey
- ? new Bounds<PartitionPosition>(lastReturnedKey, bounds.right)
- : new Range<PartitionPosition>(lastReturnedKey, bounds.right);
- }
- else
- {
- return includeLastKey
- ? new IncludingExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right)
- : new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right);
+ ? new Bounds<>(lastReturnedKey, bounds.right)
+ : new Range<>(lastReturnedKey, bounds.right);
}
+
+ return includeLastKey
+ ? new IncludingExcludingBounds<>(lastReturnedKey, bounds.right)
+ : new ExcludingBounds<>(lastReturnedKey, bounds.right);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index e95c358..93a0265 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -29,40 +29,36 @@ import org.apache.cassandra.transport.ProtocolVersion;
*
* For use by MultiPartitionPager.
*/
-public class SinglePartitionPager extends AbstractQueryPager
+public class SinglePartitionPager extends AbstractQueryPager<SinglePartitionReadQuery>
{
- private final SinglePartitionReadCommand command;
-
private volatile PagingState.RowMark lastReturned;
- public SinglePartitionPager(SinglePartitionReadCommand command, PagingState state, ProtocolVersion protocolVersion)
+ public SinglePartitionPager(SinglePartitionReadQuery query, PagingState state, ProtocolVersion protocolVersion)
{
- super(command, protocolVersion);
- this.command = command;
+ super(query, protocolVersion);
if (state != null)
{
lastReturned = state.rowMark;
- restoreState(command.partitionKey(), state.remaining, state.remainingInPartition);
+ restoreState(query.partitionKey(), state.remaining, state.remainingInPartition);
}
}
- private SinglePartitionPager(SinglePartitionReadCommand command,
+ private SinglePartitionPager(SinglePartitionReadQuery query,
ProtocolVersion protocolVersion,
PagingState.RowMark rowMark,
int remaining,
int remainingInPartition)
{
- super(command, protocolVersion);
- this.command = command;
+ super(query, protocolVersion);
this.lastReturned = rowMark;
- restoreState(command.partitionKey(), remaining, remainingInPartition);
+ restoreState(query.partitionKey(), remaining, remainingInPartition);
}
@Override
public SinglePartitionPager withUpdatedLimit(DataLimits newLimits)
{
- return new SinglePartitionPager(command.withUpdatedLimit(newLimits),
+ return new SinglePartitionPager(query.withUpdatedLimit(newLimits),
protocolVersion,
lastReturned,
maxRemaining(),
@@ -71,12 +67,12 @@ public class SinglePartitionPager extends AbstractQueryPager
public ByteBuffer key()
{
- return command.partitionKey().getKey();
+ return query.partitionKey().getKey();
}
public DataLimits limits()
{
- return command.limits();
+ return query.limits();
}
public PagingState state()
@@ -86,20 +82,21 @@ public class SinglePartitionPager extends AbstractQueryPager
: new PagingState(null, lastReturned, maxRemaining(), remainingInPartition());
}
- protected ReadCommand nextPageReadCommand(int pageSize)
+ @Override
+ protected SinglePartitionReadQuery nextPageReadQuery(int pageSize)
{
- Clustering clustering = lastReturned == null ? null : lastReturned.clustering(command.metadata());
+ Clustering clustering = lastReturned == null ? null : lastReturned.clustering(query.metadata());
DataLimits limits = lastReturned == null
? limits().forPaging(pageSize)
: limits().forPaging(pageSize, key(), remainingInPartition());
- return command.forPaging(clustering, limits);
+ return query.forPaging(clustering, limits);
}
protected void recordLast(DecoratedKey key, Row last)
{
if (last != null && last.clustering() != Clustering.STATIC_CLUSTERING)
- lastReturned = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
+ lastReturned = PagingState.RowMark.create(query.metadata(), last, protocolVersion);
}
protected boolean isPreviouslyReturnedPartition(DecoratedKey key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 7b8ef94..0852312 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -186,6 +186,7 @@ public class CacheProviderTest
assertEquals(key1.hashCode(), key2.hashCode());
tm = TableMetadata.builder("ks", "tab.indexFoo", id1)
+ .kind(TableMetadata.Kind.INDEX)
.addPartitionKeyColumn("pk", UTF8Type.instance)
.indexes(Indexes.of(IndexMetadata.fromSchemaMetadata("indexFoo", IndexMetadata.Kind.KEYS, Collections.emptyMap())))
.build();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index e53342d..662e804 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -837,6 +837,11 @@ public abstract class CQLTester
return sessionNet(protocolVersion).execute(formatQuery(query), values);
}
+ protected com.datastax.driver.core.ResultSet executeNet(String query, Object... values) throws Throwable
+ {
+ return sessionNet().execute(formatQuery(query), values);
+ }
+
protected com.datastax.driver.core.ResultSet executeNetWithPaging(ProtocolVersion version, String query, int pageSize) throws Throwable
{
return sessionNet(version).execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org