You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2016/03/10 01:44:41 UTC

calcite git commit: [CALCITE-1104] Materialized views in Cassandra (Michael Mior)

Repository: calcite
Updated Branches:
  refs/heads/master b58200bcf -> aacb2375e


[CALCITE-1104] Materialized views in Cassandra (Michael Mior)

Close apache/calcite#200


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/aacb2375
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/aacb2375
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/aacb2375

Branch: refs/heads/master
Commit: aacb2375e2e65249d300c847c712f342d93e6d9e
Parents: b58200b
Author: Michael Mior <mm...@uwaterloo.ca>
Authored: Wed Feb 24 15:56:06 2016 -0500
Committer: Julian Hyde <jh...@apache.org>
Committed: Wed Mar 9 14:23:54 2016 -0800

----------------------------------------------------------------------
 cassandra/pom.xml                               |   4 +
 .../adapter/cassandra/CassandraSchema.java      | 126 +++++++++++++++++--
 .../cassandra/CassandraSchemaFactory.java       |   2 +-
 .../adapter/cassandra/CassandraTable.java       |  14 ++-
 .../apache/calcite/test/CassandraAdapterIT.java |   9 ++
 pom.xml                                         |   2 +-
 6 files changed, 142 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/aacb2375/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/cassandra/pom.xml b/cassandra/pom.xml
index d99fb16..1231c1d 100644
--- a/cassandra/pom.xml
+++ b/cassandra/pom.xml
@@ -37,6 +37,10 @@ limitations under the License.
     <!-- Sorted by groupId, artifactId; calcite dependencies first. Put versions
          in dependencyManagement in the root POM, not here. -->
     <dependency>
+      <groupId>org.apache.calcite.avatica</groupId>
+      <artifactId>avatica</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.calcite</groupId>
       <artifactId>calcite-core</artifactId>
       <type>jar</type>

http://git-wip-us.apache.org/repos/asf/calcite/blob/aacb2375/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
index 4745aa1..47312bf 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
@@ -16,26 +16,48 @@
  */
 package org.apache.calcite.adapter.cassandra;
 
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.MaterializedViewTable;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.pretty.SqlPrettyWriter;
 import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.calcite.util.trace.CalciteTrace;
 
+import com.datastax.driver.core.AbstractTableMetadata;
 import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ClusteringOrder;
 import com.datastax.driver.core.ColumnMetadata;
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.MaterializedViewMetadata;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.TableMetadata;
+import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
+import org.slf4j.Logger;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +68,10 @@ import java.util.Map;
 public class CassandraSchema extends AbstractSchema {
   final Session session;
   final String keyspace;
+  private final SchemaPlus parentSchema;
+  final String name;
+
+  protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
 
   /**
    * Creates a Cassandra schema.
@@ -53,7 +79,7 @@ public class CassandraSchema extends AbstractSchema {
    * @param host Cassandra host, e.g. "localhost"
    * @param keyspace Cassandra keyspace name, e.g. "twissandra"
    */
-  public CassandraSchema(String host, String keyspace) {
+  public CassandraSchema(String host, String keyspace, SchemaPlus parentSchema, String name) {
     super();
 
     this.keyspace = keyspace;
@@ -63,10 +89,24 @@ public class CassandraSchema extends AbstractSchema {
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
+    this.parentSchema = parentSchema;
+    this.name = name;
+
+    Hook.TRIMMED.add(new Function<RelNode, Void>() {
+      public Void apply(RelNode node) {
+        CassandraSchema.this.addMaterializedViews();
+        return null;
+      }
+    });
   }
 
-  RelProtoDataType getRelDataType(String columnFamily) {
-    List<ColumnMetadata> columns = getKeyspace().getTable(columnFamily).getColumns();
+  RelProtoDataType getRelDataType(String columnFamily, boolean view) {
+    List<ColumnMetadata> columns;
+    if (view) {
+      columns = getKeyspace().getMaterializedView(columnFamily).getColumns();
+    } else {
+      columns = getKeyspace().getTable(columnFamily).getColumns();
+    }
 
     // Temporary type factory, just for the duration of this method. Allowable
     // because we're creating a proto-type, not a type; before being used, the
@@ -103,8 +143,13 @@ public class CassandraSchema extends AbstractSchema {
    *
    * @return A list of field names that are part of the partition and clustering keys
    */
-  Pair<List<String>, List<String>> getKeyFields(String columnFamily) {
-    TableMetadata table = getKeyspace().getTable(columnFamily);
+  Pair<List<String>, List<String>> getKeyFields(String columnFamily, boolean view) {
+    AbstractTableMetadata table;
+    if (view) {
+      table = getKeyspace().getMaterializedView(columnFamily);
+    } else {
+      table = getKeyspace().getTable(columnFamily);
+    }
 
     List<ColumnMetadata> partitionKey = table.getPartitionKey();
     List<String> pKeyFields = new ArrayList<String>();
@@ -126,13 +171,19 @@ public class CassandraSchema extends AbstractSchema {
    *
    * @return A RelCollations representing the collation of all clustering keys
    */
-  public List<RelFieldCollation> getClusteringOrder(String columnFamily) {
-    TableMetadata table = getKeyspace().getTable(columnFamily);
-    List<TableMetadata.Order> clusteringOrder = table.getClusteringOrder();
+  public List<RelFieldCollation> getClusteringOrder(String columnFamily, boolean view) {
+    AbstractTableMetadata table;
+    if (view) {
+      table = getKeyspace().getMaterializedView(columnFamily);
+    } else {
+      table = getKeyspace().getTable(columnFamily);
+    }
+
+    List<ClusteringOrder> clusteringOrder = table.getClusteringOrder();
     List<RelFieldCollation> keyCollations = new ArrayList<RelFieldCollation>();
 
     int i = 0;
-    for (TableMetadata.Order order : clusteringOrder) {
+    for (ClusteringOrder order : clusteringOrder) {
       RelFieldCollation.Direction direction;
       switch(order) {
       case DESC:
@@ -150,11 +201,68 @@ public class CassandraSchema extends AbstractSchema {
     return keyCollations;
   }
 
+  /** Add all materialized views defined in the schema to this column family
+   */
+  private void addMaterializedViews() {
+    for (MaterializedViewMetadata view : getKeyspace().getMaterializedViews()) {
+      String tableName = view.getBaseTable().getName();
+      StringBuilder queryBuilder = new StringBuilder("SELECT ");
+
+      // Add all the selected columns to the query
+      List<String> columnNames = new ArrayList<String>();
+      for (ColumnMetadata column : view.getColumns()) {
+        columnNames.add("\"" + column.getName() + "\"");
+      }
+      queryBuilder.append(Util.toString(columnNames, "", ", ", ""));
+
+      queryBuilder.append(" FROM \"" + tableName + "\"");
+
+      // Get the where clause from the system schema
+      String whereQuery = "SELECT where_clause from system_schema.views "
+          + "WHERE keyspace_name='" + keyspace + "' AND view_name='" + view.getName() + "'";
+      queryBuilder.append(" WHERE " + session.execute(whereQuery).one().getString(0));
+
+      // Parse and unparse the view query to get properly quoted field names
+      String query = queryBuilder.toString();
+      SqlParser.ConfigBuilder configBuilder = SqlParser.configBuilder();
+      configBuilder.setUnquotedCasing(Casing.UNCHANGED);
+
+      SqlSelect parsedQuery;
+      try {
+        parsedQuery = (SqlSelect) SqlParser.create(query, configBuilder.build()).parseQuery();
+      } catch (SqlParseException e) {
+        LOGGER.warn("Could not parse query {} for CQL view {}.{}",
+            query, keyspace, view.getName());
+        continue;
+      }
+
+      StringWriter stringWriter = new StringWriter(query.length());
+      PrintWriter printWriter = new PrintWriter(stringWriter);
+      SqlWriter writer = new SqlPrettyWriter(SqlDialect.CALCITE, true, printWriter);
+      parsedQuery.unparse(writer, 0, 0);
+      query = stringWriter.toString();
+
+      // Add the view for this query
+      String viewName = "$" + getTableNames().size();
+      SchemaPlus schema = parentSchema.getSubSchema(name);
+      CalciteSchema calciteSchema = CalciteSchema.from(schema);
+
+      schema.add(viewName,
+            MaterializedViewTable.create(calciteSchema, query,
+            null, view.getName(), true));
+    }
+  }
+
   @Override protected Map<String, Table> getTableMap() {
     final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
     for (TableMetadata table : getKeyspace().getTables()) {
       String tableName = table.getName();
       builder.put(tableName, new CassandraTable(this, tableName));
+
+      for (MaterializedViewMetadata view : table.getViews()) {
+        String viewName = view.getName();
+        builder.put(viewName, new CassandraTable(this, viewName, true));
+      }
     }
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/calcite/blob/aacb2375/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
index 7e52717..c380721 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
@@ -35,7 +35,7 @@ public class CassandraSchemaFactory implements SchemaFactory {
     Map map = (Map) operand;
     String host = (String) map.get("host");
     String keyspace = (String) map.get("keyspace");
-    return new CassandraSchema(host, keyspace);
+    return new CassandraSchema(host, keyspace, parentSchema, name);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/aacb2375/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
index 323d472..80f131e 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
@@ -57,11 +57,17 @@ public class CassandraTable extends AbstractQueryableTable
   List<RelFieldCollation> clusteringOrder;
   private final CassandraSchema schema;
   private final String columnFamily;
+  private final boolean view;
 
-  public CassandraTable(CassandraSchema schema, String columnFamily) {
+  public CassandraTable(CassandraSchema schema, String columnFamily, boolean view) {
     super(Object[].class);
     this.schema = schema;
     this.columnFamily = columnFamily;
+    this.view = view;
+  }
+
+  public CassandraTable(CassandraSchema schema, String columnFamily) {
+    this(schema, columnFamily, false);
   }
 
   public String toString() {
@@ -70,21 +76,21 @@ public class CassandraTable extends AbstractQueryableTable
 
   public RelDataType getRowType(RelDataTypeFactory typeFactory) {
     if (protoRowType == null) {
-      protoRowType = schema.getRelDataType(columnFamily);
+      protoRowType = schema.getRelDataType(columnFamily, view);
     }
     return protoRowType.apply(typeFactory);
   }
 
   public Pair<List<String>, List<String>> getKeyFields() {
     if (keyFields == null) {
-      keyFields = schema.getKeyFields(columnFamily);
+      keyFields = schema.getKeyFields(columnFamily, view);
     }
     return keyFields;
   }
 
   public List<RelFieldCollation> getClusteringOrder() {
     if (clusteringOrder == null) {
-      clusteringOrder = schema.getClusteringOrder(columnFamily);
+      clusteringOrder = schema.getClusteringOrder(columnFamily, view);
     }
     return clusteringOrder;
   }

http://git-wip-us.apache.org/repos/asf/calcite/blob/aacb2375/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
----------------------------------------------------------------------
diff --git a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
index 8ef08c5..8cf6673 100644
--- a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
+++ b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
@@ -97,6 +97,15 @@ public class CassandraAdapterIT {
                 + "    CassandraSort(fetch=[1])\n"
                 + "      CassandraFilter(condition=[=(CAST($0):CHAR(8) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", '!PUBLIC!')])\n");
   }
+
+  @Test public void testMaterializedView() {
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(TWISSANDRA)
+        .query("select \"tweet_id\" from \"tweets\" where \"username\"='JmuhsAaMdw'")
+        .enableMaterializations(true)
+        .explainContains("CassandraTableScan(table=[[twissandra, tweets_by_user]])");
+  }
 }
 
 // End CassandraAdapterIT.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/aacb2375/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c45df4e..65b4626 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@ limitations under the License.
     <airlift-tpch.version>0.1</airlift-tpch.version>
     <avatica.version>1.7.0-SNAPSHOT</avatica.version>
     <build-helper-maven-plugin.version>1.9</build-helper-maven-plugin.version>
-    <cassandra-driver-core.version>2.1.9</cassandra-driver-core.version>
+    <cassandra-driver-core.version>3.0.0</cassandra-driver-core.version>
     <checksum-maven-plugin.version>1.2</checksum-maven-plugin.version>
     <commons-dbcp.version>1.4</commons-dbcp.version>
     <commons-lang3.version>3.2</commons-lang3.version>