You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2016/03/10 01:44:41 UTC
calcite git commit: [CALCITE-1104] Materialized views in Cassandra
(Michael Mior)
Repository: calcite
Updated Branches:
refs/heads/master b58200bcf -> aacb2375e
[CALCITE-1104] Materialized views in Cassandra (Michael Mior)
Close apache/calcite#200
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/aacb2375
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/aacb2375
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/aacb2375
Branch: refs/heads/master
Commit: aacb2375e2e65249d300c847c712f342d93e6d9e
Parents: b58200b
Author: Michael Mior <mm...@uwaterloo.ca>
Authored: Wed Feb 24 15:56:06 2016 -0500
Committer: Julian Hyde <jh...@apache.org>
Committed: Wed Mar 9 14:23:54 2016 -0800
----------------------------------------------------------------------
cassandra/pom.xml | 4 +
.../adapter/cassandra/CassandraSchema.java | 126 +++++++++++++++++--
.../cassandra/CassandraSchemaFactory.java | 2 +-
.../adapter/cassandra/CassandraTable.java | 14 ++-
.../apache/calcite/test/CassandraAdapterIT.java | 9 ++
pom.xml | 2 +-
6 files changed, 142 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/aacb2375/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/cassandra/pom.xml b/cassandra/pom.xml
index d99fb16..1231c1d 100644
--- a/cassandra/pom.xml
+++ b/cassandra/pom.xml
@@ -37,6 +37,10 @@ limitations under the License.
<!-- Sorted by groupId, artifactId; calcite dependencies first. Put versions
in dependencyManagement in the root POM, not here. -->
<dependency>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<type>jar</type>
http://git-wip-us.apache.org/repos/asf/calcite/blob/aacb2375/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
index 4745aa1..47312bf 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
@@ -16,26 +16,48 @@
*/
package org.apache.calcite.adapter.cassandra;
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.MaterializedViewTable;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.pretty.SqlPrettyWriter;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.calcite.util.trace.CalciteTrace;
+import com.datastax.driver.core.AbstractTableMetadata;
import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ClusteringOrder;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.MaterializedViewMetadata;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
+import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -46,6 +68,10 @@ import java.util.Map;
public class CassandraSchema extends AbstractSchema {
final Session session;
final String keyspace;
+ private final SchemaPlus parentSchema;
+ final String name;
+
+ protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
/**
* Creates a Cassandra schema.
@@ -53,7 +79,7 @@ public class CassandraSchema extends AbstractSchema {
* @param host Cassandra host, e.g. "localhost"
* @param keyspace Cassandra keyspace name, e.g. "twissandra"
*/
- public CassandraSchema(String host, String keyspace) {
+ public CassandraSchema(String host, String keyspace, SchemaPlus parentSchema, String name) {
super();
this.keyspace = keyspace;
@@ -63,10 +89,24 @@ public class CassandraSchema extends AbstractSchema {
} catch (Exception e) {
throw new RuntimeException(e);
}
+ this.parentSchema = parentSchema;
+ this.name = name;
+
+ Hook.TRIMMED.add(new Function<RelNode, Void>() {
+ public Void apply(RelNode node) {
+ CassandraSchema.this.addMaterializedViews();
+ return null;
+ }
+ });
}
- RelProtoDataType getRelDataType(String columnFamily) {
- List<ColumnMetadata> columns = getKeyspace().getTable(columnFamily).getColumns();
+ RelProtoDataType getRelDataType(String columnFamily, boolean view) {
+ List<ColumnMetadata> columns;
+ if (view) {
+ columns = getKeyspace().getMaterializedView(columnFamily).getColumns();
+ } else {
+ columns = getKeyspace().getTable(columnFamily).getColumns();
+ }
// Temporary type factory, just for the duration of this method. Allowable
// because we're creating a proto-type, not a type; before being used, the
@@ -103,8 +143,13 @@ public class CassandraSchema extends AbstractSchema {
*
* @return A list of field names that are part of the partition and clustering keys
*/
- Pair<List<String>, List<String>> getKeyFields(String columnFamily) {
- TableMetadata table = getKeyspace().getTable(columnFamily);
+ Pair<List<String>, List<String>> getKeyFields(String columnFamily, boolean view) {
+ AbstractTableMetadata table;
+ if (view) {
+ table = getKeyspace().getMaterializedView(columnFamily);
+ } else {
+ table = getKeyspace().getTable(columnFamily);
+ }
List<ColumnMetadata> partitionKey = table.getPartitionKey();
List<String> pKeyFields = new ArrayList<String>();
@@ -126,13 +171,19 @@ public class CassandraSchema extends AbstractSchema {
*
* @return A RelCollations representing the collation of all clustering keys
*/
- public List<RelFieldCollation> getClusteringOrder(String columnFamily) {
- TableMetadata table = getKeyspace().getTable(columnFamily);
- List<TableMetadata.Order> clusteringOrder = table.getClusteringOrder();
+ public List<RelFieldCollation> getClusteringOrder(String columnFamily, boolean view) {
+ AbstractTableMetadata table;
+ if (view) {
+ table = getKeyspace().getMaterializedView(columnFamily);
+ } else {
+ table = getKeyspace().getTable(columnFamily);
+ }
+
+ List<ClusteringOrder> clusteringOrder = table.getClusteringOrder();
List<RelFieldCollation> keyCollations = new ArrayList<RelFieldCollation>();
int i = 0;
- for (TableMetadata.Order order : clusteringOrder) {
+ for (ClusteringOrder order : clusteringOrder) {
RelFieldCollation.Direction direction;
switch(order) {
case DESC:
@@ -150,11 +201,68 @@ public class CassandraSchema extends AbstractSchema {
return keyCollations;
}
+ /** Add all materialized views defined in the schema to this column family
+ */
+ private void addMaterializedViews() {
+ for (MaterializedViewMetadata view : getKeyspace().getMaterializedViews()) {
+ String tableName = view.getBaseTable().getName();
+ StringBuilder queryBuilder = new StringBuilder("SELECT ");
+
+ // Add all the selected columns to the query
+ List<String> columnNames = new ArrayList<String>();
+ for (ColumnMetadata column : view.getColumns()) {
+ columnNames.add("\"" + column.getName() + "\"");
+ }
+ queryBuilder.append(Util.toString(columnNames, "", ", ", ""));
+
+ queryBuilder.append(" FROM \"" + tableName + "\"");
+
+ // Get the where clause from the system schema
+ String whereQuery = "SELECT where_clause from system_schema.views "
+ + "WHERE keyspace_name='" + keyspace + "' AND view_name='" + view.getName() + "'";
+ queryBuilder.append(" WHERE " + session.execute(whereQuery).one().getString(0));
+
+ // Parse and unparse the view query to get properly quoted field names
+ String query = queryBuilder.toString();
+ SqlParser.ConfigBuilder configBuilder = SqlParser.configBuilder();
+ configBuilder.setUnquotedCasing(Casing.UNCHANGED);
+
+ SqlSelect parsedQuery;
+ try {
+ parsedQuery = (SqlSelect) SqlParser.create(query, configBuilder.build()).parseQuery();
+ } catch (SqlParseException e) {
+ LOGGER.warn("Could not parse query {} for CQL view {}.{}",
+ query, keyspace, view.getName());
+ continue;
+ }
+
+ StringWriter stringWriter = new StringWriter(query.length());
+ PrintWriter printWriter = new PrintWriter(stringWriter);
+ SqlWriter writer = new SqlPrettyWriter(SqlDialect.CALCITE, true, printWriter);
+ parsedQuery.unparse(writer, 0, 0);
+ query = stringWriter.toString();
+
+ // Add the view for this query
+ String viewName = "$" + getTableNames().size();
+ SchemaPlus schema = parentSchema.getSubSchema(name);
+ CalciteSchema calciteSchema = CalciteSchema.from(schema);
+
+ schema.add(viewName,
+ MaterializedViewTable.create(calciteSchema, query,
+ null, view.getName(), true));
+ }
+ }
+
@Override protected Map<String, Table> getTableMap() {
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
for (TableMetadata table : getKeyspace().getTables()) {
String tableName = table.getName();
builder.put(tableName, new CassandraTable(this, tableName));
+
+ for (MaterializedViewMetadata view : table.getViews()) {
+ String viewName = view.getName();
+ builder.put(viewName, new CassandraTable(this, viewName, true));
+ }
}
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/aacb2375/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
index 7e52717..c380721 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
@@ -35,7 +35,7 @@ public class CassandraSchemaFactory implements SchemaFactory {
Map map = (Map) operand;
String host = (String) map.get("host");
String keyspace = (String) map.get("keyspace");
- return new CassandraSchema(host, keyspace);
+ return new CassandraSchema(host, keyspace, parentSchema, name);
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/aacb2375/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
index 323d472..80f131e 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
@@ -57,11 +57,17 @@ public class CassandraTable extends AbstractQueryableTable
List<RelFieldCollation> clusteringOrder;
private final CassandraSchema schema;
private final String columnFamily;
+ private final boolean view;
- public CassandraTable(CassandraSchema schema, String columnFamily) {
+ public CassandraTable(CassandraSchema schema, String columnFamily, boolean view) {
super(Object[].class);
this.schema = schema;
this.columnFamily = columnFamily;
+ this.view = view;
+ }
+
+ public CassandraTable(CassandraSchema schema, String columnFamily) {
+ this(schema, columnFamily, false);
}
public String toString() {
@@ -70,21 +76,21 @@ public class CassandraTable extends AbstractQueryableTable
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
if (protoRowType == null) {
- protoRowType = schema.getRelDataType(columnFamily);
+ protoRowType = schema.getRelDataType(columnFamily, view);
}
return protoRowType.apply(typeFactory);
}
public Pair<List<String>, List<String>> getKeyFields() {
if (keyFields == null) {
- keyFields = schema.getKeyFields(columnFamily);
+ keyFields = schema.getKeyFields(columnFamily, view);
}
return keyFields;
}
public List<RelFieldCollation> getClusteringOrder() {
if (clusteringOrder == null) {
- clusteringOrder = schema.getClusteringOrder(columnFamily);
+ clusteringOrder = schema.getClusteringOrder(columnFamily, view);
}
return clusteringOrder;
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/aacb2375/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
----------------------------------------------------------------------
diff --git a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
index 8ef08c5..8cf6673 100644
--- a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
+++ b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
@@ -97,6 +97,15 @@ public class CassandraAdapterIT {
+ " CassandraSort(fetch=[1])\n"
+ " CassandraFilter(condition=[=(CAST($0):CHAR(8) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", '!PUBLIC!')])\n");
}
+
+ @Test public void testMaterializedView() {
+ CalciteAssert.that()
+ .enable(enabled())
+ .with(TWISSANDRA)
+ .query("select \"tweet_id\" from \"tweets\" where \"username\"='JmuhsAaMdw'")
+ .enableMaterializations(true)
+ .explainContains("CassandraTableScan(table=[[twissandra, tweets_by_user]])");
+ }
}
// End CassandraAdapterIT.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/aacb2375/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c45df4e..65b4626 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@ limitations under the License.
<airlift-tpch.version>0.1</airlift-tpch.version>
<avatica.version>1.7.0-SNAPSHOT</avatica.version>
<build-helper-maven-plugin.version>1.9</build-helper-maven-plugin.version>
- <cassandra-driver-core.version>2.1.9</cassandra-driver-core.version>
+ <cassandra-driver-core.version>3.0.0</cassandra-driver-core.version>
<checksum-maven-plugin.version>1.2</checksum-maven-plugin.version>
<commons-dbcp.version>1.4</commons-dbcp.version>
<commons-lang3.version>3.2</commons-lang3.version>