You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2016/02/20 00:35:24 UTC
[2/5] calcite git commit: [CALCITE-1080] Cassandra adapter (Michael
Mior)
[CALCITE-1080] Cassandra adapter (Michael Mior)
Close apache/calcite#195
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/91887366
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/91887366
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/91887366
Branch: refs/heads/master
Commit: 91887366c40310f2435b2677ac0d9616bed842cb
Parents: 95fd041
Author: Michael Mior <mm...@uwaterloo.ca>
Authored: Thu Feb 18 14:49:20 2016 -0500
Committer: Julian Hyde <jh...@apache.org>
Committed: Thu Feb 18 21:16:50 2016 -0800
----------------------------------------------------------------------
cassandra/pom.xml | 149 ++++++++
.../adapter/cassandra/CassandraEnumerator.java | 113 ++++++
.../adapter/cassandra/CassandraFilter.java | 277 ++++++++++++++
.../adapter/cassandra/CassandraMethod.java | 51 +++
.../adapter/cassandra/CassandraProject.java | 79 ++++
.../calcite/adapter/cassandra/CassandraRel.java | 75 ++++
.../adapter/cassandra/CassandraRules.java | 377 +++++++++++++++++++
.../adapter/cassandra/CassandraSchema.java | 167 ++++++++
.../cassandra/CassandraSchemaFactory.java | 42 +++
.../adapter/cassandra/CassandraSort.java | 89 +++++
.../adapter/cassandra/CassandraTable.java | 205 ++++++++++
.../adapter/cassandra/CassandraTableScan.java | 78 ++++
.../CassandraToEnumerableConverter.java | 143 +++++++
.../CassandraToEnumerableConverterRule.java | 43 +++
.../calcite/adapter/cassandra/package-info.java | 28 ++
.../apache/calcite/test/CassandraAdapterIT.java | 102 +++++
cassandra/src/test/resources/model.json | 31 ++
pom.xml | 7 +
site/_docs/adapter.md | 1 +
sqlline | 2 +-
sqlline.bat | 2 +-
21 files changed, 2059 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/cassandra/pom.xml b/cassandra/pom.xml
new file mode 100644
index 0000000..42fc6bf
--- /dev/null
+++ b/cassandra/pom.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>calcite-cassandra</artifactId>
+ <packaging>jar</packaging>
+ <version>1.7.0-SNAPSHOT</version>
+ <name>Calcite Cassandra</name>
+ <description>Cassandra adapter for Calcite</description>
+
+ <properties>
+ <top.dir>${project.basedir}/..</top.dir>
+ </properties>
+
+ <dependencies>
+ <!-- Sorted by groupId, artifactId; calcite dependencies first. Put versions
+ in dependencyManagement in the root POM, not here. -->
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-linq4j</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Sorted by groupId, artifactId. Put versions in
+ pluginManagement in the root POM, not here. -->
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>analyze</id>
+ <goals>
+ <goal>analyze-only</goal>
+ </goals>
+ <configuration>
+ <failOnWarning>true</failOnWarning>
+ <!-- ignore "unused but declared" warnings -->
+ <ignoredUnusedDeclaredDependencies>
+ <ignoredUnusedDeclaredDependency>org.slf4j:slf4j-api</ignoredUnusedDeclaredDependency>
+ <ignoredUnusedDeclaredDependency>org.slf4j:slf4j-log4j12</ignoredUnusedDeclaredDependency>
+ </ignoredUnusedDeclaredDependencies>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-release-plugin</artifactId>
+ </plugin>
+ <!-- Parent module has the same plugin and does the work of
+ generating -sources.jar for each project. But without the
+ plugin declared here, IDEs don't know the sources are
+ available. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>jar-no-fork</goal>
+ <goal>test-jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-remote-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>non-root-resources</id>
+ <goals>
+ <goal>process</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
new file mode 100644
index 0000000..0c06800
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+
+import java.util.Iterator;
+import java.util.List;
+
+/** Enumerator that reads from a Cassandra column family. */
+class CassandraEnumerator implements Enumerator<Object> {
+ private Iterator<Row> iterator;
+ private Row current;
+ private List<RelDataTypeField> fieldTypes;
+
+ /** Creates a CassandraEnumerator.
+ *
+ * @param results Cassandra result set ({@link com.datastax.driver.core.ResultSet})
+ * @param protoRowType The type of resulting rows
+ */
+ public CassandraEnumerator(ResultSet results, RelProtoDataType protoRowType) {
+ this.iterator = results.iterator();
+ this.current = null;
+
+ final RelDataTypeFactory typeFactory =
+ new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ this.fieldTypes = protoRowType.apply(typeFactory).getFieldList();
+ }
+
+ /** Produce the next row from the results
+ *
+ * @return A new row from the results
+ */
+ public Object current() {
+ if (fieldTypes.size() == 1) {
+ // If we just have one field, produce it directly
+ return currentRowField(0, fieldTypes.get(0).getType().getSqlTypeName());
+ } else {
+ // Build an array with all fields in this row
+ Object[] row = new Object[fieldTypes.size()];
+ for (int i = 0; i < fieldTypes.size(); i++) {
+ row[i] = currentRowField(i, fieldTypes.get(i).getType().getSqlTypeName());
+ }
+
+ return row;
+ }
+ }
+
+ /** Get a field for the current row from the underlying object.
+ *
+ * @param index Index of the field within the Row object
+ * @param typeName Type of the field in this row
+ */
+ private Object currentRowField(int index, SqlTypeName typeName) {
+ DataType type = current.getColumnDefinitions().getType(index);
+ if (type == DataType.ascii() || type == DataType.text() || type == DataType.varchar()) {
+ return current.getString(index);
+ } else if (type == DataType.cint() || type == DataType.varint()) {
+ return current.getInt(index);
+ } else if (type == DataType.bigint()) {
+ return current.getLong(index);
+ } else if (type == DataType.cdouble() || type == DataType.cfloat()) {
+ return current.getDouble(index);
+ } else if (type == DataType.uuid() || type == DataType.timeuuid()) {
+ return current.getUUID(index).toString();
+ } else {
+ return null;
+ }
+ }
+
+ public boolean moveNext() {
+ if (iterator.hasNext()) {
+ current = iterator.next();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public void reset() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() {
+ // Nothing to do here
+ }
+}
+
+// End CassandraEnumerator.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java
new file mode 100644
index 0000000..41eb3ad
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Implementation of a {@link org.apache.calcite.rel.core.Filter}
+ * relational expression in Cassandra.
+ */
+public class CassandraFilter extends Filter implements CassandraRel {
+ private final List<String> partitionKeys;
+ private Boolean singlePartition;
+ private final List<String> clusteringKeys;
+ private List<RelFieldCollation> implicitFieldCollations;
+ private RelCollation implicitCollation;
+ private String match;
+
+ public CassandraFilter(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode child,
+ RexNode condition,
+ List<String> partitionKeys,
+ List<String> clusteringKeys,
+ List<RelFieldCollation> implicitFieldCollations) {
+ super(cluster, traitSet, child, condition);
+
+ this.partitionKeys = partitionKeys;
+ this.singlePartition = false;
+ this.clusteringKeys = new ArrayList<String>(clusteringKeys);
+ this.implicitFieldCollations = implicitFieldCollations;
+
+ Translator translator =
+ new Translator(CassandraRules.cassandraFieldNames(getRowType()),
+ partitionKeys, clusteringKeys, implicitFieldCollations);
+ this.match = translator.translateMatch(condition);
+ this.singlePartition = translator.isSinglePartition();
+ this.implicitCollation = translator.getImplicitCollation();
+
+ assert getConvention() == CassandraRel.CONVENTION;
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ public CassandraFilter copy(RelTraitSet traitSet, RelNode input,
+ RexNode condition) {
+ return new CassandraFilter(getCluster(), traitSet, input, condition,
+ partitionKeys, clusteringKeys, implicitFieldCollations);
+ }
+
+ public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+ implementor.add(null, Collections.singletonList(match));
+ }
+
+ /** Check if the filter restricts to a single partition.
+ *
+ * @return True if the filter will restrict the underlying to a single partition
+ */
+ public boolean isSinglePartition() {
+ return singlePartition;
+ }
+
+ /** Get the resulting collation by the clustering keys after filtering.
+ *
+ * @return The implicit collation based on the natural sorting by clustering keys
+ */
+ public RelCollation getImplicitCollation() {
+ return implicitCollation;
+ }
+
+ /** Translates {@link RexNode} expressions into Cassandra expression strings. */
+ static class Translator {
+ private final List<String> fieldNames;
+ private final Set<String> partitionKeys;
+ private final List<String> clusteringKeys;
+ private int restrictedClusteringKeys;
+ private final List<RelFieldCollation> implicitFieldCollations;
+
+ Translator(List<String> fieldNames, List<String> partitionKeys, List<String> clusteringKeys,
+ List<RelFieldCollation> implicitFieldCollations) {
+ this.fieldNames = fieldNames;
+ this.partitionKeys = new HashSet<String>(partitionKeys);
+ this.clusteringKeys = clusteringKeys;
+ this.restrictedClusteringKeys = 0;
+ this.implicitFieldCollations = implicitFieldCollations;
+ }
+
+ /** Check if the query spans only one partition.
+ *
+ * @return True if the matches translated so far have resulted in a single partition
+ */
+ public boolean isSinglePartition() {
+ return partitionKeys.isEmpty();
+ }
+
+ /** Infer the implicit correlation from the unrestricted clustering keys.
+ *
+ * @return The collation of the filtered results
+ */
+ public RelCollation getImplicitCollation() {
+ // No collation applies if we aren't restricted to a single partition
+ if (!isSinglePartition()) {
+ return RelCollations.EMPTY;
+ }
+
+ // Pull out the correct fields along with their original collations
+ List<RelFieldCollation> fieldCollations = new ArrayList<RelFieldCollation>();
+ for (int i = restrictedClusteringKeys; i < clusteringKeys.size(); i++) {
+ int fieldIndex = fieldNames.indexOf(clusteringKeys.get(i));
+ RelFieldCollation.Direction direction = implicitFieldCollations.get(i).getDirection();
+ fieldCollations.add(new RelFieldCollation(fieldIndex, direction));
+ }
+
+ return RelCollations.of(fieldCollations);
+ }
+
+ /** Produce the CQL predicate string for the given condition.
+ *
+ * @param condition Condition to translate
+ * @return CQL predicate string
+ */
+ private String translateMatch(RexNode condition) {
+ // CQL does not support disjunctions
+ List<RexNode> disjunctions = RelOptUtil.disjunctions(condition);
+ if (disjunctions.size() == 1) {
+ return translateAnd(disjunctions.get(0));
+ } else {
+ throw new AssertionError("cannot translate " + condition);
+ }
+ }
+
+ /** Conver the value of a literal to a string.
+ *
+ * @param literal Literal to translate
+ * @return String representation of the literal
+ */
+ private static String literalValue(RexLiteral literal) {
+ Object value = literal.getValue2();
+ StringBuilder buf = new StringBuilder();
+ buf.append(value);
+ return buf.toString();
+ }
+
+ /** Translate a conjunctive predicate to a CQL string.
+ *
+ * @param condition A conjunctive predicate
+ * @return CQL string for the predicate
+ */
+ private String translateAnd(RexNode condition) {
+ List<String> predicates = new ArrayList<String>();
+ for (RexNode node : RelOptUtil.conjunctions(condition)) {
+ predicates.add(translateMatch2(node));
+ }
+
+ return Util.toString(predicates, "", " AND ", "");
+ }
+
+ /** Translate a binary relation. */
+ private String translateMatch2(RexNode node) {
+ // We currently only use equality, but inequalities on clustering keys
+ // should be possible in the future
+ switch (node.getKind()) {
+ case EQUALS:
+ return translateBinary("=", "=", (RexCall) node);
+ case LESS_THAN:
+ return translateBinary("<", ">", (RexCall) node);
+ case LESS_THAN_OR_EQUAL:
+ return translateBinary("<=", ">=", (RexCall) node);
+ case GREATER_THAN:
+ return translateBinary(">", "<", (RexCall) node);
+ case GREATER_THAN_OR_EQUAL:
+ return translateBinary(">=", "<=", (RexCall) node);
+ default:
+ throw new AssertionError("cannot translate " + node);
+ }
+ }
+
+ /** Translates a call to a binary operator, reversing arguments if
+ * necessary. */
+ private String translateBinary(String op, String rop, RexCall call) {
+ final RexNode left = call.operands.get(0);
+ final RexNode right = call.operands.get(1);
+ String expression = translateBinary2(op, left, right);
+ if (expression != null) {
+ return expression;
+ }
+ expression = translateBinary2(rop, right, left);
+ if (expression != null) {
+ return expression;
+ }
+ throw new AssertionError("cannot translate op " + op + " call " + call);
+ }
+
+ /** Translates a call to a binary operator. Returns null on failure. */
+ private String translateBinary2(String op, RexNode left, RexNode right) {
+ switch (right.getKind()) {
+ case LITERAL:
+ break;
+ default:
+ return null;
+ }
+ final RexLiteral rightLiteral = (RexLiteral) right;
+ switch (left.getKind()) {
+ case INPUT_REF:
+ final RexInputRef left1 = (RexInputRef) left;
+ String name = fieldNames.get(left1.getIndex());
+ return translateOp2(op, name, rightLiteral);
+ case CAST:
+ // FIXME This will not work in all cases (for example, we ignore string encoding)
+ return translateBinary2(op, ((RexCall) left).operands.get(0), right);
+ default:
+ return null;
+ }
+ }
+
+ /** Combines a field name, operator, and literal to produce a predicate string. */
+ private String translateOp2(String op, String name, RexLiteral right) {
+ // In case this is a key, record that it is now restricted
+ if (op.equals("=")) {
+ partitionKeys.remove(name);
+ if (clusteringKeys.contains(name)) {
+ restrictedClusteringKeys++;
+ }
+ }
+
+ Object value = literalValue(right);
+ String valueString = value.toString();
+ if (value instanceof String) {
+ valueString = "'" + valueString + "'";
+ }
+ return name + " " + op + " " + valueString;
+ }
+ }
+}
+
+// End CassandraFilter.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
new file mode 100644
index 0000000..8f6a59a
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.linq4j.tree.Types;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Builtin methods in the Cassandra adapter.
+ */
+public enum CassandraMethod {
+ CASSANDRA_QUERYABLE_QUERY(CassandraTable.CassandraQueryable.class, "query",
+ List.class, List.class, List.class, String.class);
+
+ public final Method method;
+
+ public static final ImmutableMap<Method, CassandraMethod> MAP;
+
+ static {
+ final ImmutableMap.Builder<Method, CassandraMethod> builder =
+ ImmutableMap.builder();
+ for (CassandraMethod value : CassandraMethod.values()) {
+ builder.put(value.method, value);
+ }
+ MAP = builder.build();
+ }
+
+ CassandraMethod(Class clazz, String methodName, Class... argumentTypes) {
+ this.method = Types.lookupMethod(clazz, methodName, argumentTypes);
+ }
+}
+
+// End CassandraMethod.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraProject.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraProject.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraProject.java
new file mode 100644
index 0000000..953e89d
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraProject.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Project}
+ * relational expression in Cassandra.
+ */
+public class CassandraProject extends Project implements CassandraRel {
+ public CassandraProject(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traitSet, input, projects, rowType);
+ assert getConvention() == CassandraRel.CONVENTION;
+ assert getConvention() == input.getConvention();
+ }
+
+ @Override public Project copy(RelTraitSet traitSet, RelNode input,
+ List<RexNode> projects, RelDataType rowType) {
+ return new CassandraProject(getCluster(), traitSet, input, projects,
+ rowType);
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+ }
+
+ public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+ final CassandraRules.RexToCassandraTranslator translator =
+ new CassandraRules.RexToCassandraTranslator(
+ (JavaTypeFactory) getCluster().getTypeFactory(),
+ CassandraRules.cassandraFieldNames(getInput().getRowType()));
+ final List<String> fields = new ArrayList<String>();
+ for (Pair<RexNode, String> pair : getNamedProjects()) {
+ final String name = pair.right;
+ final String expr = pair.left.accept(translator);
+
+ // Alias the field if necessary
+ if (name.equals(expr)) {
+ fields.add(name);
+ } else {
+ fields.add(name + " AS " + expr);
+ }
+ }
+ implementor.add(fields, null);
+ }
+}
+
+// End CassandraProject.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
new file mode 100644
index 0000000..c7f6174
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Relational expression that uses Cassandra calling convention.
+ */
+public interface CassandraRel extends RelNode {
+ void implement(Implementor implementor);
+
+ /** Calling convention for relational operations that occur in Cassandra. */
+ Convention CONVENTION = new Convention.Impl("CASSANDRA", CassandraRel.class);
+
+ /** Callback for the implementation process that converts a tree of
+ * {@link CassandraRel} nodes into a CQL query. */
+ class Implementor {
+ final List<String> selectFields = new ArrayList<String>();
+ final List<String> whereClause = new ArrayList<String>();
+ String limitValue = null;
+ final List<String> order = new ArrayList<String>();
+
+ RelOptTable table;
+ CassandraTable cassandraTable;
+
+ /** Adds newly projected fields and restricted predicates.
+ *
+ * @param fields New fields to be projected from a query
+ * @param predicates New predicates to be applied to the query
+ */
+ public void add(List<String> fields, List<String> predicates) {
+ if (fields != null) {
+ selectFields.addAll(fields);
+ }
+ if (predicates != null) {
+ whereClause.addAll(predicates);
+ }
+ }
+
+ public void addOrder(List<String> newOrder) {
+ order.addAll(newOrder);
+ }
+
+ public void setLimit(String limit) {
+ limitValue = limit;
+ }
+
+ public void visitChild(int ordinal, RelNode input) {
+ assert ordinal == 0;
+ ((CassandraRel) input).implement(this);
+ }
+ }
+}
+
+// End CassandraRel.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
new file mode 100644
index 0000000..72cbcd5
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
+import java.util.AbstractList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Rules and relational operators for
+ * {@link CassandraRel#CONVENTION}
+ * calling convention.
+ */
+public class CassandraRules {
+ private CassandraRules() {}
+
+ public static final RelOptRule[] RULES = {
+ CassandraFilterRule.INSTANCE,
+ CassandraProjectRule.INSTANCE,
+ CassandraSortRule.INSTANCE
+ };
+
+ static List<String> cassandraFieldNames(final RelDataType rowType) {
+ return SqlValidatorUtil.uniquify(
+ new AbstractList<String>() {
+ @Override public String get(int index) {
+ return rowType.getFieldList().get(index).getName();
+ }
+
+ @Override public int size() {
+ return rowType.getFieldCount();
+ }
+ });
+ }
+
+ /** Translator from {@link RexNode} to strings in Cassandra's expression
+ * language. */
+ static class RexToCassandraTranslator extends RexVisitorImpl<String> {
+ private final JavaTypeFactory typeFactory;
+ private final List<String> inFields;
+
+ protected RexToCassandraTranslator(JavaTypeFactory typeFactory,
+ List<String> inFields) {
+ super(true);
+ this.typeFactory = typeFactory;
+ this.inFields = inFields;
+ }
+
+ @Override public String visitInputRef(RexInputRef inputRef) {
+ return inFields.get(inputRef.getIndex());
+ }
+ }
+
+ /** Base class for planner rules that convert a relational expression to
+ * Cassandra calling convention. */
+ abstract static class CassandraConverterRule extends ConverterRule {
+ protected final Convention out;
+
+ public CassandraConverterRule(
+ Class<? extends RelNode> clazz,
+ String description) {
+ this(clazz, Predicates.<RelNode>alwaysTrue(), description);
+ }
+
+ public <R extends RelNode> CassandraConverterRule(
+ Class<R> clazz,
+ Predicate<? super R> predicate,
+ String description) {
+ super(clazz, predicate, Convention.NONE, CassandraRel.CONVENTION, description);
+ this.out = CassandraRel.CONVENTION;
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalFilter} to a
+ * {@link CassandraFilter}.
+ */
+ private static class CassandraFilterRule extends RelOptRule {
+ private static final Predicate<LogicalFilter> PREDICATE =
+ new Predicate<LogicalFilter>() {
+ public boolean apply(LogicalFilter input) {
+ // TODO: Check for an equality predicate on the partition key
+ // Right now this just checks if we have a single top-level AND
+ return RelOptUtil.disjunctions(input.getCondition()).size() == 1;
+ }
+ };
+
+ private static final CassandraFilterRule INSTANCE = new CassandraFilterRule();
+
+ private CassandraFilterRule() {
+ super(operand(LogicalFilter.class, operand(CassandraTableScan.class, none())),
+ "CassandraFilterRule");
+ }
+
+ @Override public boolean matches(RelOptRuleCall call) {
+ // Get the condition from the filter operation
+ LogicalFilter filter = call.rel(0);
+ RexNode condition = filter.getCondition();
+
+ // Get field names from the scan operation
+ CassandraTableScan scan = call.rel(1);
+ Pair<List<String>, List<String>> keyFields = scan.cassandraTable.getKeyFields();
+ Set<String> partitionKeys = new HashSet<String>(keyFields.left);
+ List<String> fieldNames = CassandraRules.cassandraFieldNames(filter.getInput().getRowType());
+
+ List<RexNode> disjunctions = RelOptUtil.disjunctions(condition);
+ if (disjunctions.size() != 1) {
+ return false;
+ } else {
+ // Check that all conjunctions are primary key equalities
+ condition = disjunctions.get(0);
+ for (RexNode predicate : RelOptUtil.conjunctions(condition)) {
+ if (!isEqualityOnKey(predicate, fieldNames, partitionKeys, keyFields.right)) {
+ return false;
+ }
+ }
+ }
+
+ // Either all of the partition keys must be specified or none
+ return partitionKeys.size() == keyFields.left.size() || partitionKeys.size() == 0;
+ }
+
+ /** Check if the node is a supported predicate (primary key equality).
+ *
+ * @param node Condition node to check
+ * @param fieldNames Names of all columns in the table
+ * @param partitionKeys Names of primary key columns
+ * @param clusteringKeys Names of primary key columns
+ * @return True if the node represents an equality predicate on a primary key
+ */
+ private boolean isEqualityOnKey(RexNode node, List<String> fieldNames,
+ Set<String> partitionKeys, List<String> clusteringKeys) {
+ if (node.getKind() != SqlKind.EQUALS) {
+ return false;
+ }
+
+ RexCall call = (RexCall) node;
+ final RexNode left = call.operands.get(0);
+ final RexNode right = call.operands.get(1);
+ String key = compareFieldWithLiteral(left, right, fieldNames);
+ if (key == null) {
+ key = compareFieldWithLiteral(right, left, fieldNames);
+ }
+ if (key != null) {
+ return partitionKeys.remove(key) || clusteringKeys.contains(key);
+ } else {
+ return false;
+ }
+ }
+
+ /** Check if an equality operation is comparing a primary key column with a literal.
+ *
+ * @param left Left operand of the equality
+ * @param right Right operand of the equality
+ * @param fieldNames Names of all columns in the table
+ * @return The field being compared or null if there is no key equality
+ */
+ private String compareFieldWithLiteral(RexNode left, RexNode right, List<String> fieldNames) {
+ // FIXME Ignore casts for new and assume they aren't really necessary
+ if (left.isA(SqlKind.CAST)) {
+ left = ((RexCall) left).getOperands().get(0);
+ }
+
+ if (left.isA(SqlKind.INPUT_REF) && right.isA(SqlKind.LITERAL)) {
+ final RexInputRef left1 = (RexInputRef) left;
+ String name = fieldNames.get(left1.getIndex());
+ return name;
+ } else {
+ return null;
+ }
+ }
+
+ /** @see org.apache.calcite.rel.convert.ConverterRule */
+ public void onMatch(RelOptRuleCall call) {
+ LogicalFilter filter = call.rel(0);
+ CassandraTableScan scan = call.rel(1);
+ if (filter.getTraitSet().contains(Convention.NONE)) {
+ final RelNode converted = convert(filter, scan);
+ if (converted != null) {
+ call.transformTo(converted);
+ }
+ }
+ }
+
+ public RelNode convert(LogicalFilter filter, CassandraTableScan scan) {
+ final RelTraitSet traitSet = filter.getTraitSet().replace(CassandraRel.CONVENTION);
+ final Pair<List<String>, List<String>> keyFields = scan.cassandraTable.getKeyFields();
+ return new CassandraFilter(
+ filter.getCluster(),
+ traitSet,
+ convert(filter.getInput(), CassandraRel.CONVENTION),
+ filter.getCondition(),
+ keyFields.left,
+ keyFields.right,
+ scan.cassandraTable.getClusteringOrder());
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject}
+ * to a {@link CassandraProject}.
+ */
+ private static class CassandraProjectRule extends CassandraConverterRule {
+ private static final CassandraProjectRule INSTANCE = new CassandraProjectRule();
+
+ private CassandraProjectRule() {
+ super(LogicalProject.class, "CassandraProjectRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalProject project = (LogicalProject) rel;
+ final RelTraitSet traitSet = project.getTraitSet().replace(out);
+ return new CassandraProject(project.getCluster(), traitSet,
+ convert(project.getInput(), out), project.getProjects(),
+ project.getRowType());
+ }
+ }
+
+ /**
+ * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a
+ * {@link CassandraSort}.
+ */
+ private static class CassandraSortRule extends RelOptRule {
+ private static final Predicate<Sort> SORT_PREDICATE =
+ new Predicate<Sort>() {
+ public boolean apply(Sort input) {
+ // CQL has no support for offsets
+ return input.offset == null;
+ }
+ };
+ private static final Predicate<CassandraFilter> FILTER_PREDICATE =
+ new Predicate<CassandraFilter>() {
+ public boolean apply(CassandraFilter input) {
+ // We can only use implicit sorting within a single partition
+ return input.isSinglePartition();
+ }
+ };
+ private static final RelOptRuleOperand CASSANDRA_OP =
+ operand(CassandraToEnumerableConverter.class,
+ operand(CassandraFilter.class, null, FILTER_PREDICATE, any()));
+
+ private static final CassandraSortRule INSTANCE = new CassandraSortRule();
+
+ private CassandraSortRule() {
+ super(operand(Sort.class, null, SORT_PREDICATE, CASSANDRA_OP), "CassandraSortRule");
+ }
+
+ public RelNode convert(Sort sort, CassandraFilter filter) {
+ final RelTraitSet traitSet =
+ sort.getTraitSet().replace(CassandraRel.CONVENTION)
+ .replace(sort.getCollation());
+ return new CassandraSort(sort.getCluster(), traitSet,
+ convert(sort.getInput(), traitSet.replace(RelCollations.EMPTY)),
+ sort.getCollation(), filter.getImplicitCollation(), sort.fetch);
+ }
+
+ public boolean matches(RelOptRuleCall call) {
+ final Sort sort = call.rel(0);
+ final CassandraFilter filter = call.rel(2);
+ return collationsCompatible(sort.getCollation(), filter.getImplicitCollation());
+ }
+
+ /** Check if it is possible to exploit native CQL sorting for a given collation.
+ *
+ * @return True if it is possible to achieve this sort in Cassandra
+ */
+ private boolean collationsCompatible(RelCollation sortCollation,
+ RelCollation implicitCollation) {
+ List<RelFieldCollation> sortFieldCollations = sortCollation.getFieldCollations();
+ List<RelFieldCollation> implicitFieldCollations = implicitCollation.getFieldCollations();
+
+ if (sortFieldCollations.size() > implicitFieldCollations.size()) {
+ return false;
+ }
+ if (sortFieldCollations.size() == 0) {
+ return true;
+ }
+
+ // Check if we need to reverse the order of the implicit collation
+ boolean reversed = reverseDirection(sortFieldCollations.get(0).getDirection())
+ == implicitFieldCollations.get(0).getDirection();
+
+ for (int i = 0; i < sortFieldCollations.size(); i++) {
+ RelFieldCollation sorted = sortFieldCollations.get(i);
+ RelFieldCollation implied = implicitFieldCollations.get(i);
+
+ // Check that the fields being sorted match
+ if (sorted.getFieldIndex() != implied.getFieldIndex()) {
+ return false;
+ }
+
+ // Either all fields must be sorted in the same direction
+ // or the opposite direction based on whether we decided
+ // if the sort direction should be reversed above
+ RelFieldCollation.Direction sortDirection = sorted.getDirection();
+ RelFieldCollation.Direction implicitDirection = implied.getDirection();
+ if ((!reversed && sortDirection != implicitDirection)
+ || (reversed && reverseDirection(sortDirection) != implicitDirection)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /** Find the reverse of a given collation direction.
+ *
+ * @return Reverse of the input direction
+ */
+ private RelFieldCollation.Direction reverseDirection(RelFieldCollation.Direction direction) {
+ switch(direction) {
+ case ASCENDING:
+ case STRICTLY_ASCENDING:
+ return RelFieldCollation.Direction.DESCENDING;
+ case DESCENDING:
+ case STRICTLY_DESCENDING:
+ return RelFieldCollation.Direction.ASCENDING;
+ default:
+ return null;
+ }
+ }
+
+ /** @see org.apache.calcite.rel.convert.ConverterRule */
+ public void onMatch(RelOptRuleCall call) {
+ final Sort sort = call.rel(0);
+ CassandraFilter filter = call.rel(2);
+ final RelNode converted = convert(sort, filter);
+ if (converted != null) {
+ call.transformTo(converted);
+ }
+ }
+ }
+}
+
+// End CassandraRules.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
new file mode 100644
index 0000000..4745aa1
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Pair;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Schema mapped onto a Cassandra column family
+ */
+public class CassandraSchema extends AbstractSchema {
+ final Session session;
+ final String keyspace;
+
+ /**
+ * Creates a Cassandra schema.
+ *
+ * @param host Cassandra host, e.g. "localhost"
+ * @param keyspace Cassandra keyspace name, e.g. "twissandra"
+ */
+ public CassandraSchema(String host, String keyspace) {
+ super();
+
+ this.keyspace = keyspace;
+ try {
+ Cluster cluster = Cluster.builder().addContactPoint(host).build();
+ this.session = cluster.connect(keyspace);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ RelProtoDataType getRelDataType(String columnFamily) {
+ List<ColumnMetadata> columns = getKeyspace().getTable(columnFamily).getColumns();
+
+ // Temporary type factory, just for the duration of this method. Allowable
+ // because we're creating a proto-type, not a type; before being used, the
+ // proto-type will be copied into a real type factory.
+ final RelDataTypeFactory typeFactory =
+ new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ final RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder();
+ for (ColumnMetadata column : columns) {
+ final String columnName = column.getName();
+ final DataType type = column.getType();
+
+ // TODO: This mapping of types can be done much better
+ SqlTypeName typeName = SqlTypeName.ANY;
+ if (type == DataType.ascii() || type == DataType.text() || type == DataType.varchar()
+ || type == DataType.uuid() || type == DataType.timeuuid()) {
+ typeName = SqlTypeName.CHAR;
+ } else if (type == DataType.cint() || type == DataType.varint()) {
+ typeName = SqlTypeName.INTEGER;
+ } else if (type == DataType.bigint()) {
+ typeName = SqlTypeName.BIGINT;
+ } else if (type == DataType.cdouble() || type == DataType.cfloat()
+ || type == DataType.decimal()) {
+ typeName = SqlTypeName.DOUBLE;
+ }
+
+ fieldInfo.add(columnName, typeFactory.createSqlType(typeName)).nullable(true);
+ }
+
+ return RelDataTypeImpl.proto(fieldInfo.build());
+ }
+
+ /**
+ * Get all primary key columns from the underlying CQL table
+ *
+ * @return A list of field names that are part of the partition and clustering keys
+ */
+ Pair<List<String>, List<String>> getKeyFields(String columnFamily) {
+ TableMetadata table = getKeyspace().getTable(columnFamily);
+
+ List<ColumnMetadata> partitionKey = table.getPartitionKey();
+ List<String> pKeyFields = new ArrayList<String>();
+ for (ColumnMetadata column : partitionKey) {
+ pKeyFields.add(column.getName());
+ }
+
+ List<ColumnMetadata> clusteringKey = table.getClusteringColumns();
+ List<String> cKeyFields = new ArrayList<String>();
+ for (ColumnMetadata column : clusteringKey) {
+ cKeyFields.add(column.getName());
+ }
+
+ return Pair.of((List<String>) ImmutableList.copyOf(pKeyFields),
+ (List<String>) ImmutableList.copyOf(cKeyFields));
+ }
+
+ /** Get the collation of all clustering key columns.
+ *
+ * @return A RelCollations representing the collation of all clustering keys
+ */
+ public List<RelFieldCollation> getClusteringOrder(String columnFamily) {
+ TableMetadata table = getKeyspace().getTable(columnFamily);
+ List<TableMetadata.Order> clusteringOrder = table.getClusteringOrder();
+ List<RelFieldCollation> keyCollations = new ArrayList<RelFieldCollation>();
+
+ int i = 0;
+ for (TableMetadata.Order order : clusteringOrder) {
+ RelFieldCollation.Direction direction;
+ switch(order) {
+ case DESC:
+ direction = RelFieldCollation.Direction.DESCENDING;
+ break;
+ case ASC:
+ default:
+ direction = RelFieldCollation.Direction.ASCENDING;
+ break;
+ }
+ keyCollations.add(new RelFieldCollation(i, direction));
+ i++;
+ }
+
+ return keyCollations;
+ }
+
+ @Override protected Map<String, Table> getTableMap() {
+ final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
+ for (TableMetadata table : getKeyspace().getTables()) {
+ String tableName = table.getName();
+ builder.put(tableName, new CassandraTable(this, tableName));
+ }
+ return builder.build();
+ }
+
+ private KeyspaceMetadata getKeyspace() {
+ return session.getCluster().getMetadata().getKeyspace(keyspace);
+ }
+}
+
+// End CassandraSchema.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
new file mode 100644
index 0000000..7e52717
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+
+import java.util.Map;
+
+/**
+ * Factory that creates a {@link CassandraSchema}
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class CassandraSchemaFactory implements SchemaFactory {
+ public CassandraSchemaFactory() {
+ }
+
+ public Schema create(SchemaPlus parentSchema, String name,
+ Map<String, Object> operand) {
+ Map map = (Map) operand;
+ String host = (String) map.get("host");
+ String keyspace = (String) map.get("keyspace");
+ return new CassandraSchema(host, keyspace);
+ }
+}
+
+// End CassandraSchemaFactory.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
new file mode 100644
index 0000000..89f23ed
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Sort}
+ * relational expression in Cassandra.
+ */
+public class CassandraSort extends Sort implements CassandraRel {
+ private final RelCollation implicitCollation;
+
+ public CassandraSort(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode child, RelCollation collation, RelCollation implicitCollation, RexNode fetch) {
+ super(cluster, traitSet, child, collation, null, fetch);
+
+ this.implicitCollation = implicitCollation;
+
+ assert getConvention() == CassandraRel.CONVENTION;
+ assert getConvention() == child.getConvention();
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(0.05);
+ }
+
+ @Override public Sort copy(RelTraitSet traitSet, RelNode input,
+ RelCollation newCollation, RexNode offset, RexNode fetch) {
+ return new CassandraSort(getCluster(), traitSet, input, collation, implicitCollation,
+ fetch);
+ }
+
+ public void implement(Implementor implementor) {
+ implementor.visitChild(0, getInput());
+
+ List<RelFieldCollation> sortCollations = collation.getFieldCollations();
+ List<String> fieldOrder = new ArrayList<String>();
+ if (!sortCollations.isEmpty()) {
+ // Construct a series of order clauses from the desired collation
+ final List<RelDataTypeField> fields = getRowType().getFieldList();
+ for (RelFieldCollation fieldCollation : sortCollations) {
+ final String name =
+ fields.get(fieldCollation.getFieldIndex()).getName();
+ String direction = "ASC";
+ if (fieldCollation.getDirection().equals(RelFieldCollation.Direction.DESCENDING)) {
+ direction = "DESC";
+ }
+ fieldOrder.add(name + " " + direction);
+ }
+
+ implementor.addOrder(fieldOrder);
+ }
+ if (fetch != null) {
+ implementor.setLimit(((RexLiteral) fetch).getValue().toString());
+ }
+ }
+}
+
+// End CassandraSort.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
new file mode 100644
index 0000000..323d472
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.adapter.java.AbstractQueryableTable;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTableQueryable;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Table based on a Cassandra column family
+ */
+public class CassandraTable extends AbstractQueryableTable
+ implements TranslatableTable {
+ RelProtoDataType protoRowType;
+ Pair<List<String>, List<String>> keyFields;
+ List<RelFieldCollation> clusteringOrder;
+ private final CassandraSchema schema;
+ private final String columnFamily;
+
+ public CassandraTable(CassandraSchema schema, String columnFamily) {
+ super(Object[].class);
+ this.schema = schema;
+ this.columnFamily = columnFamily;
+ }
+
+ public String toString() {
+ return "CassandraTable {" + columnFamily + "}";
+ }
+
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ if (protoRowType == null) {
+ protoRowType = schema.getRelDataType(columnFamily);
+ }
+ return protoRowType.apply(typeFactory);
+ }
+
+ public Pair<List<String>, List<String>> getKeyFields() {
+ if (keyFields == null) {
+ keyFields = schema.getKeyFields(columnFamily);
+ }
+ return keyFields;
+ }
+
+ public List<RelFieldCollation> getClusteringOrder() {
+ if (clusteringOrder == null) {
+ clusteringOrder = schema.getClusteringOrder(columnFamily);
+ }
+ return clusteringOrder;
+ }
+
+ public Enumerable<Object> query(final Session session) {
+ return query(session, Collections.<Map.Entry<String, Class>>emptyList(),
+ Collections.<String>emptyList(), Collections.<String>emptyList(), null);
+ }
+
+ /** Executes a CQL query on the underlying table.
+ *
+ * @param session Cassandra session
+ * @param fields List of fields to project
+ * @param predicates A list of predicates which should be used in the query
+ * @return Enumerator of results
+ */
+ public Enumerable<Object> query(final Session session, List<Map.Entry<String, Class>> fields,
+ List<String> predicates, List<String> order, String limit) {
+ // Build the type of the resulting row based on the provided fields
+ final RelDataTypeFactory typeFactory =
+ new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ final RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder();
+ final RelDataType rowType = protoRowType.apply(typeFactory);
+ List<String> fieldNames = new ArrayList<String>();
+ for (Map.Entry<String, Class> field : fields) {
+ String fieldName = field.getKey();
+ fieldNames.add(fieldName);
+ SqlTypeName typeName = rowType.getField(fieldName, true, false).getType().getSqlTypeName();
+ fieldInfo.add(fieldName, typeFactory.createSqlType(typeName)).nullable(true);
+ }
+ final RelProtoDataType resultRowType = RelDataTypeImpl.proto(fieldInfo.build());
+
+ // Construct the list of fields to project
+ final String selectFields;
+ if (fields.isEmpty()) {
+ selectFields = "*";
+ } else {
+ selectFields = Util.toString(fieldNames, "", ", ", "");
+ }
+
+ // Combine all predicates conjunctively
+ String whereClause = "";
+ if (!predicates.isEmpty()) {
+ whereClause = " WHERE ";
+ whereClause += Util.toString(predicates, "", " AND ", "");
+ }
+
+ // Build and issue the query and return an Enumerator over the results
+ StringBuilder queryBuilder = new StringBuilder("SELECT ");
+ queryBuilder.append(selectFields);
+ queryBuilder.append(" FROM \"" + columnFamily + "\"");
+ queryBuilder.append(whereClause);
+ if (!order.isEmpty()) {
+ queryBuilder.append(Util.toString(order, " ORDER BY ", ", ", ""));
+ }
+ if (limit != null) {
+ queryBuilder.append(" LIMIT " + limit);
+ }
+ queryBuilder.append(" ALLOW FILTERING");
+ final String query = queryBuilder.toString();
+
+ return new AbstractEnumerable<Object>() {
+ public Enumerator<Object> enumerator() {
+ final ResultSet results = session.execute(query);
+ return new CassandraEnumerator(results, resultRowType);
+ }
+ };
+ }
+
+ public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
+ SchemaPlus schema, String tableName) {
+ return new CassandraQueryable<>(queryProvider, schema, this, tableName);
+ }
+
+ public RelNode toRel(
+ RelOptTable.ToRelContext context,
+ RelOptTable relOptTable) {
+ final RelOptCluster cluster = context.getCluster();
+ return new CassandraTableScan(cluster, cluster.traitSetOf(CassandraRel.CONVENTION),
+ relOptTable, this, null);
+ }
+
+ /** Implementation of {@link org.apache.calcite.linq4j.Queryable} based on
+ * a {@link org.apache.calcite.adapter.cassandra.CassandraTable}. */
+ public static class CassandraQueryable<T> extends AbstractTableQueryable<T> {
+ public CassandraQueryable(QueryProvider queryProvider, SchemaPlus schema,
+ CassandraTable table, String tableName) {
+ super(queryProvider, schema, table, tableName);
+ }
+
+ public Enumerator<T> enumerator() {
+ //noinspection unchecked
+ final Enumerable<T> enumerable =
+ (Enumerable<T>) getTable().query(getSession());
+ return enumerable.enumerator();
+ }
+
+ private CassandraTable getTable() {
+ return (CassandraTable) table;
+ }
+
+ private Session getSession() {
+ return schema.unwrap(CassandraSchema.class).session;
+ }
+
+ /** Called via code-generation.
+ *
+ * @see org.apache.calcite.adapter.cassandra.CassandraMethod#CASSANDRA_QUERYABLE_QUERY
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public Enumerable<Object> query(List<Map.Entry<String, Class>> fields,
+ List<String> predicates, List<String> order, String limit) {
+ return getTable().query(getSession(), fields, predicates, order, limit);
+ }
+ }
+}
+
+// End CassandraTable.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTableScan.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTableScan.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTableScan.java
new file mode 100644
index 0000000..3197d93
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTableScan.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.util.List;
+
+/**
+ * Relational expression representing a scan of a Cassandra collection.
+ */
+public class CassandraTableScan extends TableScan implements CassandraRel {
+ final CassandraTable cassandraTable;
+ final RelDataType projectRowType;
+
+ /**
+ * Creates a CassandraTableScan.
+ *
+ * @param cluster Cluster
+ * @param traitSet Traits
+ * @param table Table
+ * @param cassandraTable Cassandra table
+ * @param projectRowType Fields and types to project; null to project raw row
+ */
+ protected CassandraTableScan(RelOptCluster cluster, RelTraitSet traitSet,
+ RelOptTable table, CassandraTable cassandraTable, RelDataType projectRowType) {
+ super(cluster, traitSet, table);
+ this.cassandraTable = cassandraTable;
+ this.projectRowType = projectRowType;
+
+ assert cassandraTable != null;
+ assert getConvention() == CassandraRel.CONVENTION;
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ assert inputs.isEmpty();
+ return this;
+ }
+
+ @Override public RelDataType deriveRowType() {
+ return projectRowType != null ? projectRowType : super.deriveRowType();
+ }
+
+ @Override public void register(RelOptPlanner planner) {
+ planner.addRule(CassandraToEnumerableConverterRule.INSTANCE);
+ for (RelOptRule rule : CassandraRules.RULES) {
+ planner.addRule(rule);
+ }
+ }
+
+ public void implement(Implementor implementor) {
+ implementor.cassandraTable = cassandraTable;
+ implementor.table = table;
+ }
+}
+
+// End CassandraTableScan.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
new file mode 100644
index 0000000..d79f047
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MethodCallExpression;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalcitePrepareImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterImpl;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+import java.util.AbstractList;
+import java.util.List;
+
+/**
+ * Relational expression representing a scan of a table in a Cassandra data source.
+ */
+public class CassandraToEnumerableConverter
+ extends ConverterImpl
+ implements EnumerableRel {
+ protected CassandraToEnumerableConverter(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode input) {
+ super(cluster, ConventionTraitDef.INSTANCE, traits, input);
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new CassandraToEnumerableConverter(
+ getCluster(), traitSet, sole(inputs));
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq).multiplyBy(.1);
+ }
+
+ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
+ // Generates a call to "query" with the appropriate fields and predicates
+ final BlockBuilder list = new BlockBuilder();
+ final CassandraRel.Implementor cassandraImplementor = new CassandraRel.Implementor();
+ cassandraImplementor.visitChild(0, getInput());
+ final RelDataType rowType = getRowType();
+ final PhysType physType =
+ PhysTypeImpl.of(
+ implementor.getTypeFactory(), rowType,
+ pref.prefer(JavaRowFormat.ARRAY));
+ final Expression fields =
+ list.append("fields",
+ constantArrayList(
+ Pair.zip(CassandraRules.cassandraFieldNames(rowType),
+ new AbstractList<Class>() {
+ @Override public Class get(int index) {
+ return physType.fieldClass(index);
+ }
+
+ @Override public int size() {
+ return rowType.getFieldCount();
+ }
+ }),
+ Pair.class));
+ final Expression table =
+ list.append("table",
+ cassandraImplementor.table.getExpression(
+ CassandraTable.CassandraQueryable.class));
+ final Expression predicates =
+ list.append("predicates",
+ constantArrayList(cassandraImplementor.whereClause, String.class));
+ final Expression order =
+ list.append("order",
+ constantArrayList(cassandraImplementor.order, String.class));
+ final Expression limit =
+ list.append("limit",
+ Expressions.constant(cassandraImplementor.limitValue));
+ Expression enumerable =
+ list.append("enumerable",
+ Expressions.call(table,
+ CassandraMethod.CASSANDRA_QUERYABLE_QUERY.method, fields,
+ predicates, order, limit));
+ if (CalcitePrepareImpl.DEBUG) {
+ System.out.println("Cassandra: " + predicates);
+ }
+ Hook.QUERY_PLAN.run(predicates);
+ list.add(
+ Expressions.return_(null, enumerable));
+ return implementor.result(physType, list.toBlock());
+ }
+
+ /** E.g. {@code constantArrayList("x", "y")} returns
+ * "Arrays.asList('x', 'y')". */
+ private static <T> MethodCallExpression constantArrayList(List<T> values,
+ Class clazz) {
+ return Expressions.call(
+ BuiltInMethod.ARRAYS_AS_LIST.method,
+ Expressions.newArrayInit(clazz, constantList(values)));
+ }
+
+ /** E.g. {@code constantList("x", "y")} returns
+ * {@code {ConstantExpression("x"), ConstantExpression("y")}}. */
+ private static <T> List<Expression> constantList(List<T> values) {
+ return Lists.transform(values,
+ new Function<T, Expression>() {
+ public Expression apply(T a0) {
+ return Expressions.constant(a0);
+ }
+ });
+ }
+}
+
+// End CassandraToEnumerableConverter.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverterRule.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverterRule.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverterRule.java
new file mode 100644
index 0000000..2ded8c8
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverterRule.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+
+/**
+ * Rule to convert a relational expression from
+ * {@link CassandraRel#CONVENTION} to {@link EnumerableConvention}.
+ */
+public class CassandraToEnumerableConverterRule extends ConverterRule {
+ public static final ConverterRule INSTANCE =
+ new CassandraToEnumerableConverterRule();
+
+ private CassandraToEnumerableConverterRule() {
+ super(RelNode.class, CassandraRel.CONVENTION, EnumerableConvention.INSTANCE,
+ "CassandraToEnumerableConverterRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention());
+ return new CassandraToEnumerableConverter(rel.getCluster(), newTraitSet, rel);
+ }
+}
+
+// End CassandraToEnumerableConverterRule.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/package-info.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/package-info.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/package-info.java
new file mode 100644
index 0000000..c4be45a
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Cassandra query provider.
+ *
+ * <p>There is one table for each Cassandra column family.</p>
+ */
+@PackageMarker
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
----------------------------------------------------------------------
diff --git a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
new file mode 100644
index 0000000..8ef08c5
--- /dev/null
+++ b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test;
+
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.junit.Test;
+
+/**
+ * Tests for the {@code org.apache.calcite.adapter.cassandra} package.
+ *
+ * <p>Before calling this test, you need to populate Cassandra with the
+ * "twissandra" data set, as follows:</p>
+ *
+ * <blockquote><code>
+ * git clone https://github.com/vlsi/calcite-test-dataset
+ * cd calcite-test-dataset
+ * mvn install
+ * </code></blockquote>
+ *
+ * This will create a virtual machine with Cassandra and test dataset.
+ */
+public class CassandraAdapterIT {
+ /** Connection factory based on the "mongo-zips" model. */
+ public static final ImmutableMap<String, String> TWISSANDRA =
+ ImmutableMap.of("model",
+ CassandraAdapterIT.class.getResource("/model.json")
+ .getPath());
+
+ /** Whether to run Cassandra tests. Enabled by default, however test is only
+ * included if "it" profile is activated ({@code -Pit}). To disable,
+ * specify {@code -Dcalcite.test.cassandra=false} on the Java command line. */
+ public static final boolean ENABLED =
+ Util.getBooleanProperty("calcite.test.cassandra", true);
+
+ /** Whether to run this test. */
+ protected boolean enabled() {
+ return ENABLED;
+ }
+
+ @Test public void testSelect() {
+ CalciteAssert.that()
+ .enable(enabled())
+ .with(TWISSANDRA)
+ .query("select * from \"users\"")
+ .returnsCount(10);
+ }
+
+ @Test public void testFilter() {
+ CalciteAssert.that()
+ .enable(enabled())
+ .with(TWISSANDRA)
+ .query("select * from \"userline\" where \"username\"='!PUBLIC!'")
+ .limit(1)
+ .returns("username=!PUBLIC!; time=e8754000-80b8-1fe9-8e73-e3698c967ddd; "
+ + "tweet_id=f3c329de-d05b-11e5-b58b-90e2ba530b12\n")
+ .explainContains("PLAN=CassandraToEnumerableConverter\n"
+ + " CassandraFilter(condition=[=(CAST($0):CHAR(8) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", '!PUBLIC!')])\n"
+ + " CassandraTableScan(table=[[twissandra, userline]]");
+ }
+
+ @Test public void testSort() {
+ CalciteAssert.that()
+ .enable(enabled())
+ .with(TWISSANDRA)
+ .query("select * from \"userline\" where \"username\" = '!PUBLIC!' order by \"time\" desc")
+ .returnsCount(146)
+ .explainContains("PLAN=CassandraToEnumerableConverter\n"
+ + " CassandraSort(sort0=[$1], dir0=[DESC])\n"
+ + " CassandraFilter(condition=[=(CAST($0):CHAR(8) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", '!PUBLIC!')])\n");
+ }
+
+ @Test public void testProject() {
+ CalciteAssert.that()
+ .enable(enabled())
+ .with(TWISSANDRA)
+ .query("select \"tweet_id\" from \"userline\" where \"username\" = '!PUBLIC!' limit 1")
+ .returns("tweet_id=f3c329de-d05b-11e5-b58b-90e2ba530b12\n")
+ .explainContains("PLAN=CassandraToEnumerableConverter\n"
+ + " CassandraProject(tweet_id=[$2])\n"
+ + " CassandraSort(fetch=[1])\n"
+ + " CassandraFilter(condition=[=(CAST($0):CHAR(8) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", '!PUBLIC!')])\n");
+ }
+}
+
+// End CassandraAdapterIT.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/test/resources/model.json
----------------------------------------------------------------------
diff --git a/cassandra/src/test/resources/model.json b/cassandra/src/test/resources/model.json
new file mode 100644
index 0000000..29ca31e
--- /dev/null
+++ b/cassandra/src/test/resources/model.json
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ version: '1.0',
+ defaultSchema: 'twissandra',
+ schemas: [
+ {
+ name: 'twissandra',
+ type: 'custom',
+ factory: 'org.apache.calcite.adapter.cassandra.CassandraSchemaFactory',
+ operand: {
+ host: 'localhost',
+ keyspace: 'twissandra'
+ }
+ }
+ ]
+}