You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2016/02/20 00:35:24 UTC

[2/5] calcite git commit: [CALCITE-1080] Cassandra adapter (Michael Mior)

[CALCITE-1080] Cassandra adapter (Michael Mior)

Close apache/calcite#195


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/91887366
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/91887366
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/91887366

Branch: refs/heads/master
Commit: 91887366c40310f2435b2677ac0d9616bed842cb
Parents: 95fd041
Author: Michael Mior <mm...@uwaterloo.ca>
Authored: Thu Feb 18 14:49:20 2016 -0500
Committer: Julian Hyde <jh...@apache.org>
Committed: Thu Feb 18 21:16:50 2016 -0800

----------------------------------------------------------------------
 cassandra/pom.xml                               | 149 ++++++++
 .../adapter/cassandra/CassandraEnumerator.java  | 113 ++++++
 .../adapter/cassandra/CassandraFilter.java      | 277 ++++++++++++++
 .../adapter/cassandra/CassandraMethod.java      |  51 +++
 .../adapter/cassandra/CassandraProject.java     |  79 ++++
 .../calcite/adapter/cassandra/CassandraRel.java |  75 ++++
 .../adapter/cassandra/CassandraRules.java       | 377 +++++++++++++++++++
 .../adapter/cassandra/CassandraSchema.java      | 167 ++++++++
 .../cassandra/CassandraSchemaFactory.java       |  42 +++
 .../adapter/cassandra/CassandraSort.java        |  89 +++++
 .../adapter/cassandra/CassandraTable.java       | 205 ++++++++++
 .../adapter/cassandra/CassandraTableScan.java   |  78 ++++
 .../CassandraToEnumerableConverter.java         | 143 +++++++
 .../CassandraToEnumerableConverterRule.java     |  43 +++
 .../calcite/adapter/cassandra/package-info.java |  28 ++
 .../apache/calcite/test/CassandraAdapterIT.java | 102 +++++
 cassandra/src/test/resources/model.json         |  31 ++
 pom.xml                                         |   7 +
 site/_docs/adapter.md                           |   1 +
 sqlline                                         |   2 +-
 sqlline.bat                                     |   2 +-
 21 files changed, 2059 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/cassandra/pom.xml b/cassandra/pom.xml
new file mode 100644
index 0000000..42fc6bf
--- /dev/null
+++ b/cassandra/pom.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.calcite</groupId>
+    <artifactId>calcite</artifactId>
+    <version>1.7.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>calcite-cassandra</artifactId>
+  <packaging>jar</packaging>
+  <version>1.7.0-SNAPSHOT</version>
+  <name>Calcite Cassandra</name>
+  <description>Cassandra adapter for Calcite</description>
+
+  <properties>
+    <top.dir>${project.basedir}/..</top.dir>
+  </properties>
+
+  <dependencies>
+    <!-- Sorted by groupId, artifactId; calcite dependencies first. Put versions
+         in dependencyManagement in the root POM, not here. -->
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-core</artifactId>
+      <type>jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.calcite</groupId>
+      <artifactId>calcite-linq4j</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.datastax.cassandra</groupId>
+      <artifactId>cassandra-driver-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <!-- Sorted by groupId, artifactId. Put versions in
+           pluginManagement in the root POM, not here. -->
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>analyze</id>
+            <goals>
+              <goal>analyze-only</goal>
+            </goals>
+            <configuration>
+              <failOnWarning>true</failOnWarning>
+              <!-- ignore "unused but declared" warnings -->
+              <ignoredUnusedDeclaredDependencies>
+                <ignoredUnusedDeclaredDependency>org.slf4j:slf4j-api</ignoredUnusedDeclaredDependency>
+                <ignoredUnusedDeclaredDependency>org.slf4j:slf4j-log4j12</ignoredUnusedDeclaredDependency>
+              </ignoredUnusedDeclaredDependencies>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-release-plugin</artifactId>
+      </plugin>
+      <!-- Parent module has the same plugin and does the work of
+           generating -sources.jar for each project. But without the
+           plugin declared here, IDEs don't know the sources are
+           available. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>attach-sources</id>
+            <phase>verify</phase>
+            <goals>
+              <goal>jar-no-fork</goal>
+              <goal>test-jar-no-fork</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-remote-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>non-root-resources</id>
+            <goals>
+              <goal>process</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
new file mode 100644
index 0000000..0c06800
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+
+import java.util.Iterator;
+import java.util.List;
+
+/** Enumerator that reads from a Cassandra column family. */
+class CassandraEnumerator implements Enumerator<Object> {
+  private Iterator<Row> iterator;
+  private Row current;
+  private List<RelDataTypeField> fieldTypes;
+
+  /** Creates a CassandraEnumerator.
+   *
+   * @param results Cassandra result set ({@link com.datastax.driver.core.ResultSet})
+   * @param protoRowType The type of resulting rows
+   */
+  public CassandraEnumerator(ResultSet results, RelProtoDataType protoRowType) {
+    this.iterator = results.iterator();
+    this.current = null;
+
+    final RelDataTypeFactory typeFactory =
+        new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    this.fieldTypes = protoRowType.apply(typeFactory).getFieldList();
+  }
+
+  /** Produce the next row from the results
+   *
+   * @return A new row from the results
+   */
+  public Object current() {
+    if (fieldTypes.size() == 1) {
+      // If we just have one field, produce it directly
+      return currentRowField(0, fieldTypes.get(0).getType().getSqlTypeName());
+    } else {
+      // Build an array with all fields in this row
+      Object[] row = new Object[fieldTypes.size()];
+      for (int i = 0; i < fieldTypes.size(); i++) {
+        row[i] = currentRowField(i, fieldTypes.get(i).getType().getSqlTypeName());
+      }
+
+      return row;
+    }
+  }
+
+  /** Get a field for the current row from the underlying object.
+   *
+   * @param index Index of the field within the Row object
+   * @param typeName Type of the field in this row
+   */
+  private Object currentRowField(int index, SqlTypeName typeName) {
+    DataType type = current.getColumnDefinitions().getType(index);
+    if (type == DataType.ascii() || type == DataType.text() || type == DataType.varchar()) {
+      return current.getString(index);
+    } else if (type == DataType.cint() || type == DataType.varint()) {
+      return current.getInt(index);
+    } else if (type == DataType.bigint()) {
+      return current.getLong(index);
+    } else if (type == DataType.cdouble() || type == DataType.cfloat()) {
+      return current.getDouble(index);
+    } else if (type == DataType.uuid() || type == DataType.timeuuid()) {
+      return current.getUUID(index).toString();
+    } else {
+      return null;
+    }
+  }
+
+  public boolean moveNext() {
+    if (iterator.hasNext()) {
+      current = iterator.next();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public void reset() {
+    throw new UnsupportedOperationException();
+  }
+
+  public void close() {
+    // Nothing to do here
+  }
+}
+
+// End CassandraEnumerator.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java
new file mode 100644
index 0000000..41eb3ad
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Implementation of a {@link org.apache.calcite.rel.core.Filter}
+ * relational expression in Cassandra.
+ */
+public class CassandraFilter extends Filter implements CassandraRel {
+  private final List<String> partitionKeys;
+  private Boolean singlePartition;
+  private final List<String> clusteringKeys;
+  private List<RelFieldCollation> implicitFieldCollations;
+  private RelCollation implicitCollation;
+  private String match;
+
+  public CassandraFilter(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode child,
+      RexNode condition,
+      List<String> partitionKeys,
+      List<String> clusteringKeys,
+      List<RelFieldCollation> implicitFieldCollations) {
+    super(cluster, traitSet, child, condition);
+
+    this.partitionKeys = partitionKeys;
+    this.singlePartition = false;
+    this.clusteringKeys = new ArrayList<String>(clusteringKeys);
+    this.implicitFieldCollations = implicitFieldCollations;
+
+    Translator translator =
+        new Translator(CassandraRules.cassandraFieldNames(getRowType()),
+            partitionKeys, clusteringKeys, implicitFieldCollations);
+    this.match = translator.translateMatch(condition);
+    this.singlePartition = translator.isSinglePartition();
+    this.implicitCollation = translator.getImplicitCollation();
+
+    assert getConvention() == CassandraRel.CONVENTION;
+    assert getConvention() == child.getConvention();
+  }
+
+  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+  }
+
+  public CassandraFilter copy(RelTraitSet traitSet, RelNode input,
+      RexNode condition) {
+    return new CassandraFilter(getCluster(), traitSet, input, condition,
+        partitionKeys, clusteringKeys, implicitFieldCollations);
+  }
+
+  public void implement(Implementor implementor) {
+    implementor.visitChild(0, getInput());
+    implementor.add(null, Collections.singletonList(match));
+  }
+
+  /** Check if the filter restricts to a single partition.
+   *
+   * @return True if the filter will restrict the underlying to a single partition
+   */
+  public boolean isSinglePartition() {
+    return singlePartition;
+  }
+
+  /** Get the resulting collation by the clustering keys after filtering.
+   *
+   * @return The implicit collation based on the natural sorting by clustering keys
+   */
+  public RelCollation getImplicitCollation() {
+    return implicitCollation;
+  }
+
+  /** Translates {@link RexNode} expressions into Cassandra expression strings. */
+  static class Translator {
+    private final List<String> fieldNames;
+    private final Set<String> partitionKeys;
+    private final List<String> clusteringKeys;
+    private int restrictedClusteringKeys;
+    private final List<RelFieldCollation> implicitFieldCollations;
+
+    Translator(List<String> fieldNames, List<String> partitionKeys, List<String> clusteringKeys,
+        List<RelFieldCollation> implicitFieldCollations) {
+      this.fieldNames = fieldNames;
+      this.partitionKeys = new HashSet<String>(partitionKeys);
+      this.clusteringKeys = clusteringKeys;
+      this.restrictedClusteringKeys = 0;
+      this.implicitFieldCollations = implicitFieldCollations;
+    }
+
+    /** Check if the query spans only one partition.
+     *
+     * @return True if the matches translated so far have resulted in a single partition
+     */
+    public boolean isSinglePartition() {
+      return partitionKeys.isEmpty();
+    }
+
+    /** Infer the implicit correlation from the unrestricted clustering keys.
+     *
+     * @return The collation of the filtered results
+     */
+    public RelCollation getImplicitCollation() {
+      // No collation applies if we aren't restricted to a single partition
+      if (!isSinglePartition()) {
+        return RelCollations.EMPTY;
+      }
+
+      // Pull out the correct fields along with their original collations
+      List<RelFieldCollation> fieldCollations = new ArrayList<RelFieldCollation>();
+      for (int i = restrictedClusteringKeys; i < clusteringKeys.size(); i++) {
+        int fieldIndex = fieldNames.indexOf(clusteringKeys.get(i));
+        RelFieldCollation.Direction direction = implicitFieldCollations.get(i).getDirection();
+        fieldCollations.add(new RelFieldCollation(fieldIndex, direction));
+      }
+
+      return RelCollations.of(fieldCollations);
+    }
+
+    /** Produce the CQL predicate string for the given condition.
+     *
+     * @param condition Condition to translate
+     * @return CQL predicate string
+     */
+    private String translateMatch(RexNode condition) {
+      // CQL does not support disjunctions
+      List<RexNode> disjunctions = RelOptUtil.disjunctions(condition);
+      if (disjunctions.size() == 1) {
+        return translateAnd(disjunctions.get(0));
+      } else {
+        throw new AssertionError("cannot translate " + condition);
+      }
+    }
+
+    /** Conver the value of a literal to a string.
+     *
+     * @param literal Literal to translate
+     * @return String representation of the literal
+     */
+    private static String literalValue(RexLiteral literal) {
+      Object value = literal.getValue2();
+      StringBuilder buf = new StringBuilder();
+      buf.append(value);
+      return buf.toString();
+    }
+
+    /** Translate a conjunctive predicate to a CQL string.
+     *
+     * @param condition A conjunctive predicate
+     * @return CQL string for the predicate
+     */
+    private String translateAnd(RexNode condition) {
+      List<String> predicates = new ArrayList<String>();
+      for (RexNode node : RelOptUtil.conjunctions(condition)) {
+        predicates.add(translateMatch2(node));
+      }
+
+      return Util.toString(predicates, "", " AND ", "");
+    }
+
+    /** Translate a binary relation. */
+    private String translateMatch2(RexNode node) {
+      // We currently only use equality, but inequalities on clustering keys
+      // should be possible in the future
+      switch (node.getKind()) {
+      case EQUALS:
+        return translateBinary("=", "=", (RexCall) node);
+      case LESS_THAN:
+        return translateBinary("<", ">", (RexCall) node);
+      case LESS_THAN_OR_EQUAL:
+        return translateBinary("<=", ">=", (RexCall) node);
+      case GREATER_THAN:
+        return translateBinary(">", "<", (RexCall) node);
+      case GREATER_THAN_OR_EQUAL:
+        return translateBinary(">=", "<=", (RexCall) node);
+      default:
+        throw new AssertionError("cannot translate " + node);
+      }
+    }
+
+    /** Translates a call to a binary operator, reversing arguments if
+     * necessary. */
+    private String translateBinary(String op, String rop, RexCall call) {
+      final RexNode left = call.operands.get(0);
+      final RexNode right = call.operands.get(1);
+      String expression = translateBinary2(op, left, right);
+      if (expression != null) {
+        return expression;
+      }
+      expression = translateBinary2(rop, right, left);
+      if (expression != null) {
+        return expression;
+      }
+      throw new AssertionError("cannot translate op " + op + " call " + call);
+    }
+
+    /** Translates a call to a binary operator. Returns null on failure. */
+    private String translateBinary2(String op, RexNode left, RexNode right) {
+      switch (right.getKind()) {
+      case LITERAL:
+        break;
+      default:
+        return null;
+      }
+      final RexLiteral rightLiteral = (RexLiteral) right;
+      switch (left.getKind()) {
+      case INPUT_REF:
+        final RexInputRef left1 = (RexInputRef) left;
+        String name = fieldNames.get(left1.getIndex());
+        return translateOp2(op, name, rightLiteral);
+      case CAST:
+        // FIXME This will not work in all cases (for example, we ignore string encoding)
+        return translateBinary2(op, ((RexCall) left).operands.get(0), right);
+      default:
+        return null;
+      }
+    }
+
+    /** Combines a field name, operator, and literal to produce a predicate string. */
+    private String translateOp2(String op, String name, RexLiteral right) {
+      // In case this is a key, record that it is now restricted
+      if (op.equals("=")) {
+        partitionKeys.remove(name);
+        if (clusteringKeys.contains(name)) {
+          restrictedClusteringKeys++;
+        }
+      }
+
+      Object value = literalValue(right);
+      String valueString = value.toString();
+      if (value instanceof String) {
+        valueString = "'" + valueString + "'";
+      }
+      return name + " " + op + " " + valueString;
+    }
+  }
+}
+
+// End CassandraFilter.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
new file mode 100644
index 0000000..8f6a59a
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraMethod.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.linq4j.tree.Types;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Builtin methods in the Cassandra adapter.
+ */
+public enum CassandraMethod {
+  CASSANDRA_QUERYABLE_QUERY(CassandraTable.CassandraQueryable.class, "query",
+      List.class, List.class, List.class, String.class);
+
+  public final Method method;
+
+  public static final ImmutableMap<Method, CassandraMethod> MAP;
+
+  static {
+    final ImmutableMap.Builder<Method, CassandraMethod> builder =
+        ImmutableMap.builder();
+    for (CassandraMethod value : CassandraMethod.values()) {
+      builder.put(value.method, value);
+    }
+    MAP = builder.build();
+  }
+
+  CassandraMethod(Class clazz, String methodName, Class... argumentTypes) {
+    this.method = Types.lookupMethod(clazz, methodName, argumentTypes);
+  }
+}
+
+// End CassandraMethod.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraProject.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraProject.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraProject.java
new file mode 100644
index 0000000..953e89d
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraProject.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Project}
+ * relational expression in Cassandra.
+ */
+public class CassandraProject extends Project implements CassandraRel {
+  public CassandraProject(RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+    super(cluster, traitSet, input, projects, rowType);
+    assert getConvention() == CassandraRel.CONVENTION;
+    assert getConvention() == input.getConvention();
+  }
+
+  @Override public Project copy(RelTraitSet traitSet, RelNode input,
+      List<RexNode> projects, RelDataType rowType) {
+    return new CassandraProject(getCluster(), traitSet, input, projects,
+        rowType);
+  }
+
+  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+  }
+
+  public void implement(Implementor implementor) {
+    implementor.visitChild(0, getInput());
+    final CassandraRules.RexToCassandraTranslator translator =
+        new CassandraRules.RexToCassandraTranslator(
+            (JavaTypeFactory) getCluster().getTypeFactory(),
+            CassandraRules.cassandraFieldNames(getInput().getRowType()));
+    final List<String> fields = new ArrayList<String>();
+    for (Pair<RexNode, String> pair : getNamedProjects()) {
+      final String name = pair.right;
+      final String expr = pair.left.accept(translator);
+
+      // Alias the field if necessary
+      if (name.equals(expr)) {
+        fields.add(name);
+      } else {
+        fields.add(name + " AS " + expr);
+      }
+    }
+    implementor.add(fields, null);
+  }
+}
+
+// End CassandraProject.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
new file mode 100644
index 0000000..c7f6174
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRel.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Relational expression that uses Cassandra calling convention.
+ */
+public interface CassandraRel extends RelNode {
+  void implement(Implementor implementor);
+
+  /** Calling convention for relational operations that occur in Cassandra. */
+  Convention CONVENTION = new Convention.Impl("CASSANDRA", CassandraRel.class);
+
+  /** Callback for the implementation process that converts a tree of
+   * {@link CassandraRel} nodes into a CQL query. */
+  class Implementor {
+    final List<String> selectFields = new ArrayList<String>();
+    final List<String> whereClause = new ArrayList<String>();
+    String limitValue = null;
+    final List<String> order = new ArrayList<String>();
+
+    RelOptTable table;
+    CassandraTable cassandraTable;
+
+    /** Adds newly projected fields and restricted predicates.
+     *
+     * @param fields New fields to be projected from a query
+     * @param predicates New predicates to be applied to the query
+     */
+    public void add(List<String> fields, List<String> predicates) {
+      if (fields != null) {
+        selectFields.addAll(fields);
+      }
+      if (predicates != null) {
+        whereClause.addAll(predicates);
+      }
+    }
+
+    public void addOrder(List<String> newOrder) {
+      order.addAll(newOrder);
+    }
+
+    public void setLimit(String limit) {
+      limitValue = limit;
+    }
+
+    public void visitChild(int ordinal, RelNode input) {
+      assert ordinal == 0;
+      ((CassandraRel) input).implement(this);
+    }
+  }
+}
+
+// End CassandraRel.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
new file mode 100644
index 0000000..72cbcd5
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
+import java.util.AbstractList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Rules and relational operators for
+ * {@link CassandraRel#CONVENTION}
+ * calling convention.
+ */
+public class CassandraRules {
+  private CassandraRules() {}
+
+  public static final RelOptRule[] RULES = {
+    CassandraFilterRule.INSTANCE,
+    CassandraProjectRule.INSTANCE,
+    CassandraSortRule.INSTANCE
+  };
+
+  static List<String> cassandraFieldNames(final RelDataType rowType) {
+    return SqlValidatorUtil.uniquify(
+        new AbstractList<String>() {
+          @Override public String get(int index) {
+            return rowType.getFieldList().get(index).getName();
+          }
+
+          @Override public int size() {
+            return rowType.getFieldCount();
+          }
+        });
+  }
+
+  /** Translator from {@link RexNode} to strings in Cassandra's expression
+   * language. */
+  static class RexToCassandraTranslator extends RexVisitorImpl<String> {
+    private final JavaTypeFactory typeFactory;
+    private final List<String> inFields;
+
+    protected RexToCassandraTranslator(JavaTypeFactory typeFactory,
+        List<String> inFields) {
+      super(true);
+      this.typeFactory = typeFactory;
+      this.inFields = inFields;
+    }
+
+    @Override public String visitInputRef(RexInputRef inputRef) {
+      return inFields.get(inputRef.getIndex());
+    }
+  }
+
+  /** Base class for planner rules that convert a relational expression to
+   * Cassandra calling convention. */
+  abstract static class CassandraConverterRule extends ConverterRule {
+    protected final Convention out;
+
+    public CassandraConverterRule(
+        Class<? extends RelNode> clazz,
+        String description) {
+      this(clazz, Predicates.<RelNode>alwaysTrue(), description);
+    }
+
+    public <R extends RelNode> CassandraConverterRule(
+        Class<R> clazz,
+        Predicate<? super R> predicate,
+        String description) {
+      super(clazz, predicate, Convention.NONE, CassandraRel.CONVENTION, description);
+      this.out = CassandraRel.CONVENTION;
+    }
+  }
+
+  /**
+   * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalFilter} to a
+   * {@link CassandraFilter}.
+   */
+  private static class CassandraFilterRule extends RelOptRule {
+    private static final Predicate<LogicalFilter> PREDICATE =
+        new Predicate<LogicalFilter>() {
+          public boolean apply(LogicalFilter input) {
+            // TODO: Check for an equality predicate on the partition key
+            // Right now this just checks if we have a single top-level AND
+            return RelOptUtil.disjunctions(input.getCondition()).size() == 1;
+          }
+        };
+
+    private static final CassandraFilterRule INSTANCE = new CassandraFilterRule();
+
+    private CassandraFilterRule() {
+      super(operand(LogicalFilter.class, operand(CassandraTableScan.class, none())),
+          "CassandraFilterRule");
+    }
+
+    @Override public boolean matches(RelOptRuleCall call) {
+      // Get the condition from the filter operation
+      LogicalFilter filter = call.rel(0);
+      RexNode condition = filter.getCondition();
+
+      // Get field names from the scan operation
+      CassandraTableScan scan = call.rel(1);
+      Pair<List<String>, List<String>> keyFields = scan.cassandraTable.getKeyFields();
+      Set<String> partitionKeys = new HashSet<String>(keyFields.left);
+      List<String> fieldNames = CassandraRules.cassandraFieldNames(filter.getInput().getRowType());
+
+      List<RexNode> disjunctions = RelOptUtil.disjunctions(condition);
+      if (disjunctions.size() != 1) {
+        return false;
+      } else {
+        // Check that all conjunctions are primary key equalities
+        condition = disjunctions.get(0);
+        for (RexNode predicate : RelOptUtil.conjunctions(condition)) {
+          if (!isEqualityOnKey(predicate, fieldNames, partitionKeys, keyFields.right)) {
+            return false;
+          }
+        }
+      }
+
+      // Either all of the partition keys must be specified or none
+      return partitionKeys.size() == keyFields.left.size() || partitionKeys.size() == 0;
+    }
+
+    /** Check if the node is a supported predicate (primary key equality).
+     *
+     * @param node Condition node to check
+     * @param fieldNames Names of all columns in the table
+     * @param partitionKeys Names of primary key columns
+     * @param clusteringKeys Names of primary key columns
+     * @return True if the node represents an equality predicate on a primary key
+     */
+    private boolean isEqualityOnKey(RexNode node, List<String> fieldNames,
+        Set<String> partitionKeys, List<String> clusteringKeys) {
+      if (node.getKind() != SqlKind.EQUALS) {
+        return false;
+      }
+
+      RexCall call = (RexCall) node;
+      final RexNode left = call.operands.get(0);
+      final RexNode right = call.operands.get(1);
+      String key = compareFieldWithLiteral(left, right, fieldNames);
+      if (key == null) {
+        key = compareFieldWithLiteral(right, left, fieldNames);
+      }
+      if (key != null) {
+        return partitionKeys.remove(key) || clusteringKeys.contains(key);
+      } else {
+        return false;
+      }
+    }
+
+    /** Check if an equality operation is comparing a primary key column with a literal.
+     *
+     * @param left Left operand of the equality
+     * @param right Right operand of the equality
+     * @param fieldNames Names of all columns in the table
+     * @return The field being compared or null if there is no key equality
+     */
+    private String compareFieldWithLiteral(RexNode left, RexNode right, List<String> fieldNames) {
+      // FIXME Ignore casts for new and assume they aren't really necessary
+      if (left.isA(SqlKind.CAST)) {
+        left = ((RexCall) left).getOperands().get(0);
+      }
+
+      if (left.isA(SqlKind.INPUT_REF) && right.isA(SqlKind.LITERAL)) {
+        final RexInputRef left1 = (RexInputRef) left;
+        String name = fieldNames.get(left1.getIndex());
+        return name;
+      } else {
+        return null;
+      }
+    }
+
+    /** @see org.apache.calcite.rel.convert.ConverterRule */
+    public void onMatch(RelOptRuleCall call) {
+      LogicalFilter filter = call.rel(0);
+      CassandraTableScan scan = call.rel(1);
+      if (filter.getTraitSet().contains(Convention.NONE)) {
+        final RelNode converted = convert(filter, scan);
+        if (converted != null) {
+          call.transformTo(converted);
+        }
+      }
+    }
+
+    public RelNode convert(LogicalFilter filter, CassandraTableScan scan) {
+      final RelTraitSet traitSet = filter.getTraitSet().replace(CassandraRel.CONVENTION);
+      final Pair<List<String>, List<String>> keyFields = scan.cassandraTable.getKeyFields();
+      return new CassandraFilter(
+          filter.getCluster(),
+          traitSet,
+          convert(filter.getInput(), CassandraRel.CONVENTION),
+          filter.getCondition(),
+          keyFields.left,
+          keyFields.right,
+          scan.cassandraTable.getClusteringOrder());
+    }
+  }
+
+  /**
+   * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject}
+   * to a {@link CassandraProject}.
+   */
+  private static class CassandraProjectRule extends CassandraConverterRule {
+    private static final CassandraProjectRule INSTANCE = new CassandraProjectRule();
+
+    private CassandraProjectRule() {
+      super(LogicalProject.class, "CassandraProjectRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      final LogicalProject project = (LogicalProject) rel;
+      final RelTraitSet traitSet = project.getTraitSet().replace(out);
+      return new CassandraProject(project.getCluster(), traitSet,
+          convert(project.getInput(), out), project.getProjects(),
+          project.getRowType());
+    }
+  }
+
+  /**
+   * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a
+   * {@link CassandraSort}.
+   */
+  private static class CassandraSortRule extends RelOptRule {
+    private static final Predicate<Sort> SORT_PREDICATE =
+        new Predicate<Sort>() {
+          public boolean apply(Sort input) {
+            // CQL has no support for offsets
+            return input.offset == null;
+          }
+        };
+    private static final Predicate<CassandraFilter> FILTER_PREDICATE =
+        new Predicate<CassandraFilter>() {
+          public boolean apply(CassandraFilter input) {
+            // We can only use implicit sorting within a single partition
+            return input.isSinglePartition();
+          }
+        };
+    private static final RelOptRuleOperand CASSANDRA_OP =
+        operand(CassandraToEnumerableConverter.class,
+        operand(CassandraFilter.class, null, FILTER_PREDICATE, any()));
+
+    private static final CassandraSortRule INSTANCE = new CassandraSortRule();
+
+    private CassandraSortRule() {
+      super(operand(Sort.class, null, SORT_PREDICATE, CASSANDRA_OP), "CassandraSortRule");
+    }
+
+    public RelNode convert(Sort sort, CassandraFilter filter) {
+      final RelTraitSet traitSet =
+          sort.getTraitSet().replace(CassandraRel.CONVENTION)
+              .replace(sort.getCollation());
+      return new CassandraSort(sort.getCluster(), traitSet,
+          convert(sort.getInput(), traitSet.replace(RelCollations.EMPTY)),
+          sort.getCollation(), filter.getImplicitCollation(), sort.fetch);
+    }
+
+    public boolean matches(RelOptRuleCall call) {
+      final Sort sort = call.rel(0);
+      final CassandraFilter filter = call.rel(2);
+      return collationsCompatible(sort.getCollation(), filter.getImplicitCollation());
+    }
+
+    /** Check if it is possible to exploit native CQL sorting for a given collation.
+     *
+     * @return True if it is possible to achieve this sort in Cassandra
+     */
+    private boolean collationsCompatible(RelCollation sortCollation,
+        RelCollation implicitCollation) {
+      List<RelFieldCollation> sortFieldCollations = sortCollation.getFieldCollations();
+      List<RelFieldCollation> implicitFieldCollations = implicitCollation.getFieldCollations();
+
+      if (sortFieldCollations.size() > implicitFieldCollations.size()) {
+        return false;
+      }
+      if (sortFieldCollations.size() == 0) {
+        return true;
+      }
+
+      // Check if we need to reverse the order of the implicit collation
+      boolean reversed = reverseDirection(sortFieldCollations.get(0).getDirection())
+          == implicitFieldCollations.get(0).getDirection();
+
+      for (int i = 0; i < sortFieldCollations.size(); i++) {
+        RelFieldCollation sorted = sortFieldCollations.get(i);
+        RelFieldCollation implied = implicitFieldCollations.get(i);
+
+        // Check that the fields being sorted match
+        if (sorted.getFieldIndex() != implied.getFieldIndex()) {
+          return false;
+        }
+
+        // Either all fields must be sorted in the same direction
+        // or the opposite direction based on whether we decided
+        // if the sort direction should be reversed above
+        RelFieldCollation.Direction sortDirection = sorted.getDirection();
+        RelFieldCollation.Direction implicitDirection = implied.getDirection();
+        if ((!reversed && sortDirection != implicitDirection)
+            || (reversed && reverseDirection(sortDirection) != implicitDirection)) {
+          return false;
+        }
+      }
+
+      return true;
+    }
+
+    /** Find the reverse of a given collation direction.
+     *
+     * @return Reverse of the input direction
+     */
+    private RelFieldCollation.Direction reverseDirection(RelFieldCollation.Direction direction) {
+      switch(direction) {
+      case ASCENDING:
+      case STRICTLY_ASCENDING:
+        return RelFieldCollation.Direction.DESCENDING;
+      case DESCENDING:
+      case STRICTLY_DESCENDING:
+        return RelFieldCollation.Direction.ASCENDING;
+      default:
+        return null;
+      }
+    }
+
+    /** @see org.apache.calcite.rel.convert.ConverterRule */
+    public void onMatch(RelOptRuleCall call) {
+      final Sort sort = call.rel(0);
+      CassandraFilter filter = call.rel(2);
+      final RelNode converted = convert(sort, filter);
+      if (converted != null) {
+        call.transformTo(converted);
+      }
+    }
+  }
+}
+
+// End CassandraRules.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
new file mode 100644
index 0000000..4745aa1
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Pair;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Schema mapped onto a Cassandra column family
+ */
+public class CassandraSchema extends AbstractSchema {
+  final Session session;
+  final String keyspace;
+
+  /**
+   * Creates a Cassandra schema.
+   *
+   * @param host Cassandra host, e.g. "localhost"
+   * @param keyspace Cassandra keyspace name, e.g. "twissandra"
+   */
+  public CassandraSchema(String host, String keyspace) {
+    super();
+
+    this.keyspace = keyspace;
+    try {
+      Cluster cluster = Cluster.builder().addContactPoint(host).build();
+      this.session = cluster.connect(keyspace);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  RelProtoDataType getRelDataType(String columnFamily) {
+    List<ColumnMetadata> columns = getKeyspace().getTable(columnFamily).getColumns();
+
+    // Temporary type factory, just for the duration of this method. Allowable
+    // because we're creating a proto-type, not a type; before being used, the
+    // proto-type will be copied into a real type factory.
+    final RelDataTypeFactory typeFactory =
+        new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    final RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder();
+    for (ColumnMetadata column : columns) {
+      final String columnName = column.getName();
+      final DataType type = column.getType();
+
+      // TODO: This mapping of types can be done much better
+      SqlTypeName typeName = SqlTypeName.ANY;
+      if (type == DataType.ascii() || type == DataType.text() || type == DataType.varchar()
+            || type == DataType.uuid() || type == DataType.timeuuid()) {
+        typeName = SqlTypeName.CHAR;
+      } else if (type == DataType.cint() || type == DataType.varint()) {
+        typeName = SqlTypeName.INTEGER;
+      } else if (type == DataType.bigint()) {
+        typeName = SqlTypeName.BIGINT;
+      } else if (type == DataType.cdouble() || type == DataType.cfloat()
+          || type == DataType.decimal()) {
+        typeName = SqlTypeName.DOUBLE;
+      }
+
+      fieldInfo.add(columnName, typeFactory.createSqlType(typeName)).nullable(true);
+    }
+
+    return RelDataTypeImpl.proto(fieldInfo.build());
+  }
+
+  /**
+   * Get all primary key columns from the underlying CQL table
+   *
+   * @return A list of field names that are part of the partition and clustering keys
+   */
+  Pair<List<String>, List<String>> getKeyFields(String columnFamily) {
+    TableMetadata table = getKeyspace().getTable(columnFamily);
+
+    List<ColumnMetadata> partitionKey = table.getPartitionKey();
+    List<String> pKeyFields = new ArrayList<String>();
+    for (ColumnMetadata column : partitionKey) {
+      pKeyFields.add(column.getName());
+    }
+
+    List<ColumnMetadata> clusteringKey = table.getClusteringColumns();
+    List<String> cKeyFields = new ArrayList<String>();
+    for (ColumnMetadata column : clusteringKey) {
+      cKeyFields.add(column.getName());
+    }
+
+    return Pair.of((List<String>) ImmutableList.copyOf(pKeyFields),
+        (List<String>) ImmutableList.copyOf(cKeyFields));
+  }
+
+  /** Get the collation of all clustering key columns.
+   *
+   * @return A RelCollations representing the collation of all clustering keys
+   */
+  public List<RelFieldCollation> getClusteringOrder(String columnFamily) {
+    TableMetadata table = getKeyspace().getTable(columnFamily);
+    List<TableMetadata.Order> clusteringOrder = table.getClusteringOrder();
+    List<RelFieldCollation> keyCollations = new ArrayList<RelFieldCollation>();
+
+    int i = 0;
+    for (TableMetadata.Order order : clusteringOrder) {
+      RelFieldCollation.Direction direction;
+      switch(order) {
+      case DESC:
+        direction = RelFieldCollation.Direction.DESCENDING;
+        break;
+      case ASC:
+      default:
+        direction = RelFieldCollation.Direction.ASCENDING;
+        break;
+      }
+      keyCollations.add(new RelFieldCollation(i, direction));
+      i++;
+    }
+
+    return keyCollations;
+  }
+
+  @Override protected Map<String, Table> getTableMap() {
+    final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
+    for (TableMetadata table : getKeyspace().getTables()) {
+      String tableName = table.getName();
+      builder.put(tableName, new CassandraTable(this, tableName));
+    }
+    return builder.build();
+  }
+
+  private KeyspaceMetadata getKeyspace() {
+    return session.getCluster().getMetadata().getKeyspace(keyspace);
+  }
+}
+
+// End CassandraSchema.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
new file mode 100644
index 0000000..7e52717
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaFactory;
+import org.apache.calcite.schema.SchemaPlus;
+
+import java.util.Map;
+
+/**
+ * Factory that creates a {@link CassandraSchema}
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class CassandraSchemaFactory implements SchemaFactory {
+  public CassandraSchemaFactory() {
+  }
+
+  public Schema create(SchemaPlus parentSchema, String name,
+      Map<String, Object> operand) {
+    Map map = (Map) operand;
+    String host = (String) map.get("host");
+    String keyspace = (String) map.get("keyspace");
+    return new CassandraSchema(host, keyspace);
+  }
+}
+
+// End CassandraSchemaFactory.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
new file mode 100644
index 0000000..89f23ed
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSort.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Sort}
+ * relational expression in Cassandra.
+ */
+public class CassandraSort extends Sort implements CassandraRel {
+  private final RelCollation implicitCollation;
+
+  public CassandraSort(RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode child, RelCollation collation, RelCollation implicitCollation, RexNode fetch) {
+    super(cluster, traitSet, child, collation, null, fetch);
+
+    this.implicitCollation = implicitCollation;
+
+    assert getConvention() == CassandraRel.CONVENTION;
+    assert getConvention() == child.getConvention();
+  }
+
+  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(0.05);
+  }
+
+  @Override public Sort copy(RelTraitSet traitSet, RelNode input,
+      RelCollation newCollation, RexNode offset, RexNode fetch) {
+    return new CassandraSort(getCluster(), traitSet, input, collation, implicitCollation,
+        fetch);
+  }
+
+  public void implement(Implementor implementor) {
+    implementor.visitChild(0, getInput());
+
+    List<RelFieldCollation> sortCollations = collation.getFieldCollations();
+    List<String> fieldOrder = new ArrayList<String>();
+    if (!sortCollations.isEmpty()) {
+      // Construct a series of order clauses from the desired collation
+      final List<RelDataTypeField> fields = getRowType().getFieldList();
+      for (RelFieldCollation fieldCollation : sortCollations) {
+        final String name =
+            fields.get(fieldCollation.getFieldIndex()).getName();
+        String direction = "ASC";
+        if (fieldCollation.getDirection().equals(RelFieldCollation.Direction.DESCENDING)) {
+          direction = "DESC";
+        }
+        fieldOrder.add(name + " " + direction);
+      }
+
+      implementor.addOrder(fieldOrder);
+    }
+    if (fetch != null) {
+      implementor.setLimit(((RexLiteral) fetch).getValue().toString());
+    }
+  }
+}
+
+// End CassandraSort.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
new file mode 100644
index 0000000..323d472
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.adapter.java.AbstractQueryableTable;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTableQueryable;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Table based on a Cassandra column family
+ */
+public class CassandraTable extends AbstractQueryableTable
+    implements TranslatableTable {
+  RelProtoDataType protoRowType;
+  Pair<List<String>, List<String>> keyFields;
+  List<RelFieldCollation> clusteringOrder;
+  private final CassandraSchema schema;
+  private final String columnFamily;
+
+  public CassandraTable(CassandraSchema schema, String columnFamily) {
+    super(Object[].class);
+    this.schema = schema;
+    this.columnFamily = columnFamily;
+  }
+
+  public String toString() {
+    return "CassandraTable {" + columnFamily + "}";
+  }
+
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    if (protoRowType == null) {
+      protoRowType = schema.getRelDataType(columnFamily);
+    }
+    return protoRowType.apply(typeFactory);
+  }
+
+  public Pair<List<String>, List<String>> getKeyFields() {
+    if (keyFields == null) {
+      keyFields = schema.getKeyFields(columnFamily);
+    }
+    return keyFields;
+  }
+
+  public List<RelFieldCollation> getClusteringOrder() {
+    if (clusteringOrder == null) {
+      clusteringOrder = schema.getClusteringOrder(columnFamily);
+    }
+    return clusteringOrder;
+  }
+
+  public Enumerable<Object> query(final Session session) {
+    return query(session, Collections.<Map.Entry<String, Class>>emptyList(),
+        Collections.<String>emptyList(), Collections.<String>emptyList(), null);
+  }
+
+  /** Executes a CQL query on the underlying table.
+   *
+   * @param session Cassandra session
+   * @param fields List of fields to project
+   * @param predicates A list of predicates which should be used in the query
+   * @return Enumerator of results
+   */
+  public Enumerable<Object> query(final Session session, List<Map.Entry<String, Class>> fields,
+        List<String> predicates, List<String> order, String limit) {
+    // Build the type of the resulting row based on the provided fields
+    final RelDataTypeFactory typeFactory =
+        new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    final RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder();
+    final RelDataType rowType = protoRowType.apply(typeFactory);
+    List<String> fieldNames = new ArrayList<String>();
+    for (Map.Entry<String, Class> field : fields) {
+      String fieldName = field.getKey();
+      fieldNames.add(fieldName);
+      SqlTypeName typeName = rowType.getField(fieldName, true, false).getType().getSqlTypeName();
+      fieldInfo.add(fieldName, typeFactory.createSqlType(typeName)).nullable(true);
+    }
+    final RelProtoDataType resultRowType = RelDataTypeImpl.proto(fieldInfo.build());
+
+    // Construct the list of fields to project
+    final String selectFields;
+    if (fields.isEmpty()) {
+      selectFields = "*";
+    } else {
+      selectFields = Util.toString(fieldNames, "", ", ", "");
+    }
+
+    // Combine all predicates conjunctively
+    String whereClause = "";
+    if (!predicates.isEmpty()) {
+      whereClause = " WHERE ";
+      whereClause += Util.toString(predicates, "", " AND ", "");
+    }
+
+    // Build and issue the query and return an Enumerator over the results
+    StringBuilder queryBuilder = new StringBuilder("SELECT ");
+    queryBuilder.append(selectFields);
+    queryBuilder.append(" FROM \"" + columnFamily + "\"");
+    queryBuilder.append(whereClause);
+    if (!order.isEmpty()) {
+      queryBuilder.append(Util.toString(order, " ORDER BY ", ", ", ""));
+    }
+    if (limit != null) {
+      queryBuilder.append(" LIMIT " + limit);
+    }
+    queryBuilder.append(" ALLOW FILTERING");
+    final String query = queryBuilder.toString();
+
+    return new AbstractEnumerable<Object>() {
+      public Enumerator<Object> enumerator() {
+        final ResultSet results = session.execute(query);
+        return new CassandraEnumerator(results, resultRowType);
+      }
+    };
+  }
+
+  public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
+      SchemaPlus schema, String tableName) {
+    return new CassandraQueryable<>(queryProvider, schema, this, tableName);
+  }
+
+  public RelNode toRel(
+      RelOptTable.ToRelContext context,
+      RelOptTable relOptTable) {
+    final RelOptCluster cluster = context.getCluster();
+    return new CassandraTableScan(cluster, cluster.traitSetOf(CassandraRel.CONVENTION),
+        relOptTable, this, null);
+  }
+
+  /** Implementation of {@link org.apache.calcite.linq4j.Queryable} based on
+   * a {@link org.apache.calcite.adapter.cassandra.CassandraTable}. */
+  public static class CassandraQueryable<T> extends AbstractTableQueryable<T> {
+    public CassandraQueryable(QueryProvider queryProvider, SchemaPlus schema,
+        CassandraTable table, String tableName) {
+      super(queryProvider, schema, table, tableName);
+    }
+
+    public Enumerator<T> enumerator() {
+      //noinspection unchecked
+      final Enumerable<T> enumerable =
+          (Enumerable<T>) getTable().query(getSession());
+      return enumerable.enumerator();
+    }
+
+    private CassandraTable getTable() {
+      return (CassandraTable) table;
+    }
+
+    private Session getSession() {
+      return schema.unwrap(CassandraSchema.class).session;
+    }
+
+    /** Called via code-generation.
+     *
+     * @see org.apache.calcite.adapter.cassandra.CassandraMethod#CASSANDRA_QUERYABLE_QUERY
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public Enumerable<Object> query(List<Map.Entry<String, Class>> fields,
+        List<String> predicates, List<String> order, String limit) {
+      return getTable().query(getSession(), fields, predicates, order, limit);
+    }
+  }
+}
+
+// End CassandraTable.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTableScan.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTableScan.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTableScan.java
new file mode 100644
index 0000000..3197d93
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTableScan.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.util.List;
+
+/**
+ * Relational expression representing a scan of a Cassandra collection.
+ */
+public class CassandraTableScan extends TableScan implements CassandraRel {
+  final CassandraTable cassandraTable;
+  final RelDataType projectRowType;
+
+  /**
+   * Creates a CassandraTableScan.
+   *
+   * @param cluster        Cluster
+   * @param traitSet       Traits
+   * @param table          Table
+   * @param cassandraTable Cassandra table
+   * @param projectRowType Fields and types to project; null to project raw row
+   */
+  protected CassandraTableScan(RelOptCluster cluster, RelTraitSet traitSet,
+      RelOptTable table, CassandraTable cassandraTable, RelDataType projectRowType) {
+    super(cluster, traitSet, table);
+    this.cassandraTable = cassandraTable;
+    this.projectRowType = projectRowType;
+
+    assert cassandraTable != null;
+    assert getConvention() == CassandraRel.CONVENTION;
+  }
+
+  @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    assert inputs.isEmpty();
+    return this;
+  }
+
+  @Override public RelDataType deriveRowType() {
+    return projectRowType != null ? projectRowType : super.deriveRowType();
+  }
+
+  @Override public void register(RelOptPlanner planner) {
+    planner.addRule(CassandraToEnumerableConverterRule.INSTANCE);
+    for (RelOptRule rule : CassandraRules.RULES) {
+      planner.addRule(rule);
+    }
+  }
+
+  public void implement(Implementor implementor) {
+    implementor.cassandraTable = cassandraTable;
+    implementor.table = table;
+  }
+}
+
+// End CassandraTableScan.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
new file mode 100644
index 0000000..d79f047
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MethodCallExpression;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.CalcitePrepareImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterImpl;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+import java.util.AbstractList;
+import java.util.List;
+
+/**
+ * Relational expression representing a scan of a table in a Cassandra data source.
+ */
+public class CassandraToEnumerableConverter
+  extends ConverterImpl
+  implements EnumerableRel {
+  protected CassandraToEnumerableConverter(
+      RelOptCluster cluster,
+      RelTraitSet traits,
+      RelNode input) {
+    super(cluster, ConventionTraitDef.INSTANCE, traits, input);
+  }
+
+  @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new CassandraToEnumerableConverter(
+        getCluster(), traitSet, sole(inputs));
+  }
+
+  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    return super.computeSelfCost(planner, mq).multiplyBy(.1);
+  }
+
+  public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
+    // Generates a call to "query" with the appropriate fields and predicates
+    final BlockBuilder list = new BlockBuilder();
+    final CassandraRel.Implementor cassandraImplementor = new CassandraRel.Implementor();
+    cassandraImplementor.visitChild(0, getInput());
+    final RelDataType rowType = getRowType();
+    final PhysType physType =
+        PhysTypeImpl.of(
+                implementor.getTypeFactory(), rowType,
+                pref.prefer(JavaRowFormat.ARRAY));
+    final Expression fields =
+        list.append("fields",
+            constantArrayList(
+                Pair.zip(CassandraRules.cassandraFieldNames(rowType),
+                    new AbstractList<Class>() {
+                      @Override public Class get(int index) {
+                        return physType.fieldClass(index);
+                      }
+
+                      @Override public int size() {
+                        return rowType.getFieldCount();
+                      }
+                    }),
+                Pair.class));
+    final Expression table =
+        list.append("table",
+            cassandraImplementor.table.getExpression(
+                CassandraTable.CassandraQueryable.class));
+    final Expression predicates =
+        list.append("predicates",
+            constantArrayList(cassandraImplementor.whereClause, String.class));
+    final Expression order =
+        list.append("order",
+            constantArrayList(cassandraImplementor.order, String.class));
+    final Expression limit =
+        list.append("limit",
+            Expressions.constant(cassandraImplementor.limitValue));
+    Expression enumerable =
+        list.append("enumerable",
+            Expressions.call(table,
+                CassandraMethod.CASSANDRA_QUERYABLE_QUERY.method, fields,
+                predicates, order, limit));
+    if (CalcitePrepareImpl.DEBUG) {
+      System.out.println("Cassandra: " + predicates);
+    }
+    Hook.QUERY_PLAN.run(predicates);
+    list.add(
+        Expressions.return_(null, enumerable));
+    return implementor.result(physType, list.toBlock());
+  }
+
+  /** E.g. {@code constantArrayList("x", "y")} returns
+   * "Arrays.asList('x', 'y')". */
+  private static <T> MethodCallExpression constantArrayList(List<T> values,
+      Class clazz) {
+    return Expressions.call(
+        BuiltInMethod.ARRAYS_AS_LIST.method,
+        Expressions.newArrayInit(clazz, constantList(values)));
+  }
+
+  /** E.g. {@code constantList("x", "y")} returns
+   * {@code {ConstantExpression("x"), ConstantExpression("y")}}. */
+  private static <T> List<Expression> constantList(List<T> values) {
+    return Lists.transform(values,
+        new Function<T, Expression>() {
+          public Expression apply(T a0) {
+            return Expressions.constant(a0);
+          }
+        });
+  }
+}
+
+// End CassandraToEnumerableConverter.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverterRule.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverterRule.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverterRule.java
new file mode 100644
index 0000000..2ded8c8
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraToEnumerableConverterRule.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+
+/**
+ * Rule to convert a relational expression from
+ * {@link CassandraRel#CONVENTION} to {@link EnumerableConvention}.
+ */
+public class CassandraToEnumerableConverterRule extends ConverterRule {
+  public static final ConverterRule INSTANCE =
+    new CassandraToEnumerableConverterRule();
+
+  private CassandraToEnumerableConverterRule() {
+    super(RelNode.class, CassandraRel.CONVENTION, EnumerableConvention.INSTANCE,
+        "CassandraToEnumerableConverterRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention());
+    return new CassandraToEnumerableConverter(rel.getCluster(), newTraitSet, rel);
+  }
+}
+
+// End CassandraToEnumerableConverterRule.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/package-info.java
----------------------------------------------------------------------
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/package-info.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/package-info.java
new file mode 100644
index 0000000..c4be45a
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Cassandra query provider.
+ *
+ * <p>There is one table for each Cassandra column family.</p>
+ */
+@PackageMarker
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
----------------------------------------------------------------------
diff --git a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
new file mode 100644
index 0000000..8ef08c5
--- /dev/null
+++ b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test;
+
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.junit.Test;
+
+/**
+ * Tests for the {@code org.apache.calcite.adapter.cassandra} package.
+ *
+ * <p>Before calling this test, you need to populate Cassandra with the
+ * "twissandra" data set, as follows:</p>
+ *
+ * <blockquote><code>
+ * git clone https://github.com/vlsi/calcite-test-dataset
+ * cd calcite-test-dataset
+ * mvn install
+ * </code></blockquote>
+ *
+ * This will create a virtual machine with Cassandra and test dataset.
+ */
+public class CassandraAdapterIT {
+  /** Connection factory based on the "mongo-zips" model. */
+  public static final ImmutableMap<String, String> TWISSANDRA =
+      ImmutableMap.of("model",
+          CassandraAdapterIT.class.getResource("/model.json")
+              .getPath());
+
+  /** Whether to run Cassandra tests. Enabled by default, however test is only
+   * included if "it" profile is activated ({@code -Pit}). To disable,
+   * specify {@code -Dcalcite.test.cassandra=false} on the Java command line. */
+  public static final boolean ENABLED =
+     Util.getBooleanProperty("calcite.test.cassandra", true);
+
+  /** Whether to run this test. */
+  protected boolean enabled() {
+    return ENABLED;
+  }
+
+  @Test public void testSelect() {
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(TWISSANDRA)
+        .query("select * from \"users\"")
+        .returnsCount(10);
+  }
+
+  @Test public void testFilter() {
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(TWISSANDRA)
+        .query("select * from \"userline\" where \"username\"='!PUBLIC!'")
+        .limit(1)
+        .returns("username=!PUBLIC!; time=e8754000-80b8-1fe9-8e73-e3698c967ddd; "
+            + "tweet_id=f3c329de-d05b-11e5-b58b-90e2ba530b12\n")
+        .explainContains("PLAN=CassandraToEnumerableConverter\n"
+           + "  CassandraFilter(condition=[=(CAST($0):CHAR(8) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", '!PUBLIC!')])\n"
+           + "    CassandraTableScan(table=[[twissandra, userline]]");
+  }
+
+  @Test public void testSort() {
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(TWISSANDRA)
+        .query("select * from \"userline\" where \"username\" = '!PUBLIC!' order by \"time\" desc")
+        .returnsCount(146)
+        .explainContains("PLAN=CassandraToEnumerableConverter\n"
+            + "  CassandraSort(sort0=[$1], dir0=[DESC])\n"
+            + "    CassandraFilter(condition=[=(CAST($0):CHAR(8) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", '!PUBLIC!')])\n");
+  }
+
+  @Test public void testProject() {
+    CalciteAssert.that()
+        .enable(enabled())
+        .with(TWISSANDRA)
+        .query("select \"tweet_id\" from \"userline\" where \"username\" = '!PUBLIC!' limit 1")
+        .returns("tweet_id=f3c329de-d05b-11e5-b58b-90e2ba530b12\n")
+        .explainContains("PLAN=CassandraToEnumerableConverter\n"
+                + "  CassandraProject(tweet_id=[$2])\n"
+                + "    CassandraSort(fetch=[1])\n"
+                + "      CassandraFilter(condition=[=(CAST($0):CHAR(8) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", '!PUBLIC!')])\n");
+  }
+}
+
+// End CassandraAdapterIT.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/91887366/cassandra/src/test/resources/model.json
----------------------------------------------------------------------
diff --git a/cassandra/src/test/resources/model.json b/cassandra/src/test/resources/model.json
new file mode 100644
index 0000000..29ca31e
--- /dev/null
+++ b/cassandra/src/test/resources/model.json
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  version: '1.0',
+  defaultSchema: 'twissandra',
+  schemas: [
+    {
+      name: 'twissandra',
+      type: 'custom',
+      factory: 'org.apache.calcite.adapter.cassandra.CassandraSchemaFactory',
+      operand: {
+        host: 'localhost',
+        keyspace: 'twissandra'
+      }
+    }
+  ]
+}