You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by mm...@apache.org on 2020/02/06 18:18:57 UTC

[calcite] branch master updated: [CALCITE-3465] Add support for missing Cassandra 3.x data types (Alessandro Solimando)

This is an automated email from the ASF dual-hosted git repository.

mmior pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d37a0e  [CALCITE-3465] Add support for missing Cassandra 3.x data types (Alessandro Solimando)
5d37a0e is described below

commit 5d37a0e990a2b84c6616338490d7451065fdd3e5
Author: Alessandro Solimando <18...@users.noreply.github.com>
AuthorDate: Tue Nov 5 22:23:52 2019 +0100

    [CALCITE-3465] Add support for missing Cassandra 3.x data types (Alessandro Solimando)
    
    Closes #1774
---
 .../adapter/cassandra/CassandraEnumerator.java     |  71 +++++---
 .../calcite/adapter/cassandra/CassandraFilter.java |   4 +-
 .../calcite/adapter/cassandra/CassandraSchema.java |  98 +++++++---
 .../calcite/adapter/cassandra/CassandraTable.java  |  19 +-
 .../cassandra/CqlToSqlTypeConversionRules.java     |  94 ++++++++++
 .../calcite/test/AbstractCassandraAdapterTest.java | 110 ++++++++++++
 .../test/CassandraAdapterDataTypesTest.java        | 199 +++++++++++++++++++++
 .../apache/calcite/test/CassandraAdapterTest.java  |  86 +--------
 cassandra/src/test/resources/datatypes.cql         | 118 ++++++++++++
 cassandra/src/test/resources/model-datatypes.json  |  32 ++++
 10 files changed, 693 insertions(+), 138 deletions(-)

diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
index 96ca916..0413819 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
@@ -16,20 +16,26 @@
  */
 package org.apache.calcite.adapter.cassandra;
 
+import org.apache.calcite.avatica.util.ByteString;
+import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
-import org.apache.calcite.sql.type.SqlTypeName;
 
-import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.LocalDate;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
+import com.datastax.driver.core.TupleValue;
 
+import java.nio.ByteBuffer;
+import java.util.Date;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.stream.IntStream;
 
 /** Enumerator that reads from a Cassandra column family. */
 class CassandraEnumerator implements Enumerator<Object> {
@@ -58,12 +64,12 @@ class CassandraEnumerator implements Enumerator<Object> {
   public Object current() {
     if (fieldTypes.size() == 1) {
       // If we just have one field, produce it directly
-      return currentRowField(0, fieldTypes.get(0).getType().getSqlTypeName());
+      return currentRowField(0);
     } else {
       // Build an array with all fields in this row
       Object[] row = new Object[fieldTypes.size()];
       for (int i = 0; i < fieldTypes.size(); i++) {
-        row[i] = currentRowField(i, fieldTypes.get(i).getType().getSqlTypeName());
+        row[i] = currentRowField(i);
       }
 
       return row;
@@ -73,25 +79,48 @@ class CassandraEnumerator implements Enumerator<Object> {
   /** Get a field for the current row from the underlying object.
    *
    * @param index Index of the field within the Row object
-   * @param typeName Type of the field in this row
    */
-  private Object currentRowField(int index, SqlTypeName typeName) {
-    DataType type = current.getColumnDefinitions().getType(index);
-    if (type == DataType.ascii() || type == DataType.text() || type == DataType.varchar()) {
-      return current.getString(index);
-    } else if (type == DataType.cint() || type == DataType.varint()) {
-      return current.getInt(index);
-    } else if (type == DataType.bigint()) {
-      return current.getLong(index);
-    } else if (type == DataType.cdouble()) {
-      return current.getDouble(index);
-    } else if (type == DataType.cfloat()) {
-      return current.getFloat(index);
-    } else if (type == DataType.uuid() || type == DataType.timeuuid()) {
-      return current.getUUID(index).toString();
-    } else {
-      return null;
+  private Object currentRowField(int index) {
+    final Object o =  current.get(index,
+        CassandraSchema.CODEC_REGISTRY.codecFor(
+            current.getColumnDefinitions().getType(index)));
+
+    return convertToEnumeratorObject(o);
+  }
+
+  /** Convert an object into the expected internal representation.
+   *
+   * @param obj Object to convert, if needed
+   */
+  private Object convertToEnumeratorObject(Object obj) {
+    if (obj instanceof ByteBuffer) {
+      ByteBuffer buf = (ByteBuffer) obj;
+      byte [] bytes = new byte[buf.remaining()];
+      buf.get(bytes, 0, bytes.length);
+      return new ByteString(bytes);
+    } else if (obj instanceof LocalDate) {
+      // converts dates to the expected numeric format
+      return ((LocalDate) obj).getMillisSinceEpoch()
+          / DateTimeUtils.MILLIS_PER_DAY;
+    } else if (obj instanceof Date) {
+      return ((Date) obj).toInstant().toEpochMilli();
+    } else if (obj instanceof LinkedHashSet) {
+      // MULTISET is handled as an array
+      return ((LinkedHashSet) obj).toArray();
+    } else if (obj instanceof TupleValue) {
+      // STRUCT can be handled as an array
+      final TupleValue tupleValue = (TupleValue) obj;
+      int numComponents = tupleValue.getType().getComponentTypes().size();
+      return IntStream.range(0, numComponents)
+          .mapToObj(i ->
+              tupleValue.get(i,
+                  CassandraSchema.CODEC_REGISTRY.codecFor(
+                      tupleValue.getType().getComponentTypes().get(i)))
+          ).map(this::convertToEnumeratorObject)
+          .toArray();
     }
+
+    return obj;
   }
 
   public boolean moveNext() {
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java
index 2d80426..31bd014 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java
@@ -181,9 +181,7 @@ public class CassandraFilter extends Filter implements CassandraRel {
      */
     private static String literalValue(RexLiteral literal) {
       Object value = literal.getValue2();
-      StringBuilder buf = new StringBuilder();
-      buf.append(value);
-      return buf.toString();
+      return String.valueOf(value);
     }
 
     /** Translate a conjunctive predicate to a CQL string.
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
index 85c4c6d..b0a2458 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
@@ -19,6 +19,7 @@ package org.apache.calcite.adapter.cassandra;
 import org.apache.calcite.avatica.util.Casing;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
@@ -43,12 +44,14 @@ import org.apache.calcite.util.trace.CalciteTrace;
 import com.datastax.driver.core.AbstractTableMetadata;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ClusteringOrder;
+import com.datastax.driver.core.CodecRegistry;
 import com.datastax.driver.core.ColumnMetadata;
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.KeyspaceMetadata;
 import com.datastax.driver.core.MaterializedViewMetadata;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.TupleType;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
@@ -58,6 +61,8 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Schema mapped onto a Cassandra column family
@@ -69,6 +74,10 @@ public class CassandraSchema extends AbstractSchema {
   final String name;
   final Hook.Closeable hook;
 
+  static final CodecRegistry CODEC_REGISTRY = CodecRegistry.DEFAULT_INSTANCE;
+  static final CqlToSqlTypeConversionRules CQL_TO_SQL_TYPE =
+      CqlToSqlTypeConversionRules.instance();
+
   protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
 
   private static final int DEFAULT_CASSANDRA_PORT = 9042;
@@ -160,28 +169,68 @@ public class CassandraSchema extends AbstractSchema {
         new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
     final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
     for (ColumnMetadata column : columns) {
-      final String columnName = column.getName();
-      final DataType type = column.getType();
-
-      // TODO: This mapping of types can be done much better
-      SqlTypeName typeName = SqlTypeName.ANY;
-      if (type == DataType.uuid() || type == DataType.timeuuid()) {
-        // We currently rely on this in CassandraFilter to detect UUID columns.
-        // That is, these fixed length literals should be unquoted in CQL.
-        typeName = SqlTypeName.CHAR;
-      } else if (type == DataType.ascii() || type == DataType.text()
-            || type == DataType.varchar()) {
-        typeName = SqlTypeName.VARCHAR;
-      } else if (type == DataType.cint() || type == DataType.varint()) {
-        typeName = SqlTypeName.INTEGER;
-      } else if (type == DataType.bigint()) {
-        typeName = SqlTypeName.BIGINT;
-      } else if (type == DataType.cdouble() || type == DataType.cfloat()
-          || type == DataType.decimal()) {
-        typeName = SqlTypeName.DOUBLE;
-      }
+      final SqlTypeName typeName =
+          CQL_TO_SQL_TYPE.lookup(column.getType().getName());
+
+      switch (typeName) {
+      case ARRAY:
+        final SqlTypeName arrayInnerType = CQL_TO_SQL_TYPE.lookup(
+            column.getType().getTypeArguments().get(0).getName());
+
+        fieldInfo.add(column.getName(),
+            typeFactory.createArrayType(
+                typeFactory.createSqlType(arrayInnerType), -1))
+            .nullable(true);
+
+        break;
+      case MULTISET:
+        final SqlTypeName multiSetInnerType = CQL_TO_SQL_TYPE.lookup(
+            column.getType().getTypeArguments().get(0).getName());
+
+        fieldInfo.add(column.getName(),
+            typeFactory.createMultisetType(
+                typeFactory.createSqlType(multiSetInnerType), -1)
+        ).nullable(true);
+
+        break;
+      case MAP:
+        final List<DataType> types = column.getType().getTypeArguments();
+        final SqlTypeName keyType =
+            CQL_TO_SQL_TYPE.lookup(types.get(0).getName());
+        final SqlTypeName valueType =
+            CQL_TO_SQL_TYPE.lookup(types.get(1).getName());
+
+        fieldInfo.add(column.getName(),
+            typeFactory.createMapType(
+                typeFactory.createSqlType(keyType),
+                typeFactory.createSqlType(valueType))
+        ).nullable(true);
+
+        break;
+      case STRUCTURED:
+        assert DataType.Name.TUPLE == column.getType().getName();
+
+        final List<DataType> typeArgs =
+            ((TupleType) column.getType()).getComponentTypes();
+        final List<Map.Entry<String, RelDataType>> typesList =
+            IntStream.range(0, typeArgs.size())
+                .mapToObj(
+                    i -> new Pair<>(
+                        Integer.toString(i + 1), // 1 indexed (as ARRAY)
+                        typeFactory.createSqlType(
+                            CQL_TO_SQL_TYPE.lookup(typeArgs.get(i).getName()))))
+                .collect(Collectors.toList());
+
+        fieldInfo.add(column.getName(),
+            typeFactory.createStructType(typesList))
+            .nullable(true);
+
+        break;
+      default:
+        fieldInfo.add(column.getName(), typeName).nullable(true);
 
-      fieldInfo.add(columnName, typeFactory.createSqlType(typeName)).nullable(true);
+        break;
+      }
     }
 
     return RelDataTypeImpl.proto(fieldInfo.build());
@@ -267,12 +316,15 @@ public class CassandraSchema extends AbstractSchema {
       }
       queryBuilder.append(Util.toString(columnNames, "", ", ", ""));
 
-      queryBuilder.append(" FROM \"" + tableName + "\"");
+      queryBuilder.append(" FROM \"")
+          .append(tableName)
+          .append("\"");
 
       // Get the where clause from the system schema
       String whereQuery = "SELECT where_clause from system_schema.views "
           + "WHERE keyspace_name='" + keyspace + "' AND view_name='" + view.getName() + "'";
-      queryBuilder.append(" WHERE " + session.execute(whereQuery).one().getString(0));
+      queryBuilder.append(" WHERE ")
+          .append(session.execute(whereQuery).one().getString(0));
 
       // Parse and unparse the view query to get properly quoted field names
       String query = queryBuilder.toString();
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
index fa7d0b1..8b4112a 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
@@ -36,7 +36,6 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTableQueryable;
 import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 
@@ -118,10 +117,9 @@ public class CassandraTable extends AbstractQueryableTable
     final RelDataType rowType = getRowType(typeFactory);
 
     Function1<String, Void> addField = fieldName -> {
-      SqlTypeName typeName =
-          rowType.getField(fieldName, true, false).getType().getSqlTypeName();
-      fieldInfo.add(fieldName, typeFactory.createSqlType(typeName))
-          .nullable(true);
+      RelDataType relDataType =
+          rowType.getField(fieldName, true, false).getType();
+      fieldInfo.add(fieldName, relDataType).nullable(true);
       return null;
     };
 
@@ -172,9 +170,11 @@ public class CassandraTable extends AbstractQueryableTable
 
     // Build and issue the query and return an Enumerator over the results
     StringBuilder queryBuilder = new StringBuilder("SELECT ");
-    queryBuilder.append(selectString);
-    queryBuilder.append(" FROM \"" + columnFamily + "\"");
-    queryBuilder.append(whereClause);
+    queryBuilder.append(selectString)
+        .append(" FROM \"")
+        .append(columnFamily)
+        .append("\"")
+        .append(whereClause);
     if (!order.isEmpty()) {
       queryBuilder.append(Util.toString(order, " ORDER BY ", ", ", ""));
     }
@@ -184,7 +184,8 @@ public class CassandraTable extends AbstractQueryableTable
       limit += fetch;
     }
     if (limit > 0) {
-      queryBuilder.append(" LIMIT " + limit);
+      queryBuilder.append(" LIMIT ")
+          .append(limit);
     }
     queryBuilder.append(" ALLOW FILTERING");
     final String query = queryBuilder.toString();
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CqlToSqlTypeConversionRules.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CqlToSqlTypeConversionRules.java
new file mode 100644
index 0000000..b55683b
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CqlToSqlTypeConversionRules.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import com.datastax.driver.core.DataType;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+/**
+ * CqlToSqlTypeConversionRules defines mappings from CQL types to
+ * corresponding SQL types.
+ */
+public class CqlToSqlTypeConversionRules {
+  //~ Static fields/initializers ---------------------------------------------
+
+  private static final CqlToSqlTypeConversionRules INSTANCE =
+      new CqlToSqlTypeConversionRules();
+
+  //~ Instance fields --------------------------------------------------------
+
+  private final Map<DataType.Name, SqlTypeName> rules =
+      ImmutableMap.<DataType.Name, SqlTypeName>builder()
+          .put(DataType.Name.UUID, SqlTypeName.CHAR)
+          .put(DataType.Name.TIMEUUID, SqlTypeName.CHAR)
+
+          .put(DataType.Name.ASCII, SqlTypeName.VARCHAR)
+          .put(DataType.Name.TEXT, SqlTypeName.VARCHAR)
+          .put(DataType.Name.VARCHAR, SqlTypeName.VARCHAR)
+
+          .put(DataType.Name.INT, SqlTypeName.INTEGER)
+          .put(DataType.Name.VARINT, SqlTypeName.INTEGER)
+          .put(DataType.Name.BIGINT, SqlTypeName.BIGINT)
+          .put(DataType.Name.TINYINT, SqlTypeName.TINYINT)
+          .put(DataType.Name.SMALLINT, SqlTypeName.SMALLINT)
+
+          .put(DataType.Name.DOUBLE, SqlTypeName.DOUBLE)
+          .put(DataType.Name.FLOAT, SqlTypeName.REAL)
+          .put(DataType.Name.DECIMAL, SqlTypeName.DOUBLE)
+
+          .put(DataType.Name.BLOB, SqlTypeName.VARBINARY)
+
+          .put(DataType.Name.BOOLEAN, SqlTypeName.BOOLEAN)
+
+          .put(DataType.Name.COUNTER, SqlTypeName.BIGINT)
+
+          // number of nanoseconds since midnight
+          .put(DataType.Name.TIME, SqlTypeName.BIGINT)
+          .put(DataType.Name.DATE, SqlTypeName.DATE)
+          .put(DataType.Name.TIMESTAMP, SqlTypeName.TIMESTAMP)
+
+          .put(DataType.Name.MAP, SqlTypeName.MAP)
+          .put(DataType.Name.LIST, SqlTypeName.ARRAY)
+          .put(DataType.Name.SET, SqlTypeName.MULTISET)
+          .put(DataType.Name.TUPLE, SqlTypeName.STRUCTURED)
+          .build();
+
+  //~ Methods ----------------------------------------------------------------
+
+  /**
+   * Returns the
+   * {@link org.apache.calcite.util.Glossary#SINGLETON_PATTERN singleton}
+   * instance.
+   */
+  public static CqlToSqlTypeConversionRules instance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Returns a corresponding {@link SqlTypeName} for a given CQL type name.
+   *
+   * @param name the CQL type name to lookup
+   * @return a corresponding SqlTypeName if found, ANY otherwise
+   */
+  public SqlTypeName lookup(DataType.Name name) {
+    return rules.getOrDefault(name, SqlTypeName.ANY);
+  }
+}
diff --git a/cassandra/src/test/java/org/apache/calcite/test/AbstractCassandraAdapterTest.java b/cassandra/src/test/java/org/apache/calcite/test/AbstractCassandraAdapterTest.java
new file mode 100644
index 0000000..ea55f2d
--- /dev/null
+++ b/cassandra/src/test/java/org/apache/calcite/test/AbstractCassandraAdapterTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test;
+
+import org.apache.calcite.config.CalciteSystemProperty;
+import org.apache.calcite.util.Bug;
+import org.apache.calcite.util.Sources;
+import org.apache.calcite.util.TestUtil;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.cassandraunit.CassandraCQLUnit;
+import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
+import org.junit.BeforeClass;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for the {@code org.apache.calcite.adapter.cassandra} package.
+ *
+ * Common initialization functions for Cassandra tests.
+ */
+public abstract class AbstractCassandraAdapterTest {
+
+  /** Connection factory based on the "mongo-zips" model. */
+  static ImmutableMap<String, String> getDataset(String resourcePath) {
+    return ImmutableMap.of("model",
+        Sources.of(CassandraAdapterTest.class.getResource(resourcePath))
+            .file().getAbsolutePath());
+  }
+
+  /**
+   * Whether to run this test.
+   * <p>Enabled by default, unless explicitly disabled
+   * from command line ({@code -Dcalcite.test.cassandra=false}) or running on incompatible JDK
+   * version (see below).
+   *
+   * <p>As of this wiring Cassandra 4.x is not yet released and we're using 3.x
+   * (which fails on JDK11+). All cassandra tests will be skipped if
+   * running on JDK11+.
+   *
+   * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-9608">CASSANDRA-9608</a>
+   * @return {@code true} if test is compatible with current environment,
+   *         {@code false} otherwise
+   */
+  private static boolean enabled() {
+    final boolean enabled = CalciteSystemProperty.TEST_CASSANDRA.value();
+    Bug.upgrade("remove JDK version check once current adapter supports Cassandra 4.x");
+    final boolean compatibleJdk = TestUtil.getJavaMajorVersion() < 11;
+    return enabled && compatibleJdk;
+  }
+
+  static ExternalResource initCassandraIfEnabled(String dataSetLocation) {
+    if (!enabled()) {
+      // Return NOP resource (to avoid nulls)
+      return new ExternalResource() {
+        @Override public Statement apply(final Statement base, final Description description) {
+          return super.apply(base, description);
+        }
+      };
+    }
+
+    String configurationFileName = null; // use default one
+    // Apache Jenkins often fails with
+    // CassandraAdapterTest Cassandra daemon did not start within timeout (20 sec by default)
+    long startUpTimeoutMillis = TimeUnit.SECONDS.toMillis(60);
+
+    CassandraCQLUnit rule = new CassandraCQLUnit(
+        new ClassPathCQLDataSet(dataSetLocation),
+        configurationFileName,
+        startUpTimeoutMillis);
+
+    // This static init is necessary otherwise tests fail with CassandraUnit in IntelliJ (jdk10)
+    // should be called right after constructor
+    // NullPointerException for DatabaseDescriptor.getDiskFailurePolicy
+    // for more info see
+    // https://github.com/jsevellec/cassandra-unit/issues/249
+    // https://github.com/jsevellec/cassandra-unit/issues/221
+    DatabaseDescriptor.daemonInitialization();
+
+    return rule;
+  }
+
+  @BeforeClass
+  public static void setUp() {
+    // run tests only if explicitly enabled
+    assumeTrue("test explicitly disabled", enabled());
+  }
+}
diff --git a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterDataTypesTest.java b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterDataTypesTest.java
new file mode 100644
index 0000000..f33ce42
--- /dev/null
+++ b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterDataTypesTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.junit.rules.ExternalResource;
+
+/**
+ * Tests for the {@code org.apache.calcite.adapter.cassandra} package related to data types.
+ *
+ * <p>Will start embedded cassandra cluster and populate it from local {@code datatypes.cql} file.
+ * All configuration files are located in test classpath.
+ *
+ * <p>Note that tests will be skipped if running on JDK11+
+ * (which is not yet supported by cassandra) see
+ * <a href="https://issues.apache.org/jira/browse/CASSANDRA-9608">CASSANDRA-9608</a>.
+ *
+ */
+
+@Execution(ExecutionMode.SAME_THREAD)
+public class CassandraAdapterDataTypesTest extends AbstractCassandraAdapterTest {
+
+  @ClassRule
+  public static final ExternalResource RULE =
+          initCassandraIfEnabled("datatypes.cql");
+
+  /** Connection factory based on the "mongo-zips" model. */
+  private static final ImmutableMap<String, String> DTCASSANDRA =
+          getDataset("/model-datatypes.json");
+
+  @Test public void testSimpleTypesRowType() {
+    CalciteAssert.that()
+            .with(DTCASSANDRA)
+            .query("select * from \"test_simple\"")
+            .typeIs("[f_int INTEGER"
+                + ", f_ascii VARCHAR"
+                + ", f_bigint BIGINT"
+                + ", f_blob VARBINARY"
+                + ", f_boolean BOOLEAN"
+                + ", f_date DATE"
+                + ", f_decimal DOUBLE"
+                + ", f_double DOUBLE"
+                + ", f_duration ANY"
+                + ", f_float REAL"
+                + ", f_inet ANY"
+                + ", f_smallint SMALLINT"
+                + ", f_text VARCHAR"
+                + ", f_time BIGINT"
+                + ", f_timestamp TIMESTAMP"
+                + ", f_timeuuid CHAR"
+                + ", f_tinyint TINYINT"
+                + ", f_uuid CHAR"
+                + ", f_varchar VARCHAR"
+                + ", f_varint INTEGER]");
+  }
+
+  @Test public void testSimpleTypesValues() {
+    CalciteAssert.that()
+        .with(DTCASSANDRA)
+        .query("select * from \"test_simple\"")
+        .returns("f_int=0"
+            + "; f_ascii=abcdefg"
+            + "; f_bigint=3000000000"
+            + "; f_blob=20"
+            + "; f_boolean=true"
+            + "; f_date=2015-05-03"
+            + "; f_decimal=2.1"
+            + "; f_double=2.0"
+            + "; f_duration=89h9m9s"
+            + "; f_float=5.1"
+            + "; f_inet=/192.168.0.1"
+            + "; f_smallint=5"
+            + "; f_text=abcdefg"
+            + "; f_time=48654234000000"
+            + "; f_timestamp=2011-02-03 04:05:00"
+            + "; f_timeuuid=8ac6d1dc-fbeb-11e9-8f0b-362b9e155667"
+            + "; f_tinyint=0"
+            + "; f_uuid=123e4567-e89b-12d3-a456-426655440000"
+            + "; f_varchar=abcdefg"
+            + "; f_varint=10\n");
+  }
+
+  @Test public void testCounterRowType() {
+    CalciteAssert.that()
+            .with(DTCASSANDRA)
+            .query("select * from \"test_counter\"")
+            .typeIs("[f_int INTEGER, f_counter BIGINT]");
+  }
+
+  @Test public void testCounterValues() {
+    CalciteAssert.that()
+        .with(DTCASSANDRA)
+        .query("select * from \"test_counter\"")
+        .returns("f_int=1; f_counter=1\n");
+  }
+
+  @Test public void testCollectionsRowType() {
+    CalciteAssert.that()
+            .with(DTCASSANDRA)
+            .query("select * from \"test_collections\"")
+            .typeIs("[f_int INTEGER"
+                + ", f_list INTEGER ARRAY"
+                + ", f_map (VARCHAR, VARCHAR) MAP"
+                + ", f_set DOUBLE MULTISET"
+                + ", f_tuple STRUCT]");
+  }
+
+  @Test public void testCollectionsValues() {
+    CalciteAssert.that()
+            .with(DTCASSANDRA)
+            .query("select * from \"test_collections\"")
+            .returns("f_int=0"
+                + "; f_list=[1, 2, 3]"
+                + "; f_map={k1=v1, k2=v2}"
+                + "; f_set=[2.0, 3.1]"
+                + "; f_tuple={3000000000, 30ff87, 2015-05-03 13:30:54.234}"
+                + "\n");
+  }
+
+  @Test public void testCollectionsInnerRowType() {
+    CalciteAssert.that()
+        .with(DTCASSANDRA)
+        .query("select \"f_list\"[1], "
+            + "\"f_map\"['k1'], "
+            + "\"f_tuple\"['1'], "
+            + "\"f_tuple\"['2'], "
+            + "\"f_tuple\"['3']"
+            + " from \"test_collections\"")
+        .typeIs("[EXPR$0 INTEGER"
+            + ", EXPR$1 VARCHAR"
+            + ", EXPR$2 BIGINT"
+            + ", EXPR$3 VARBINARY"
+            + ", EXPR$4 TIMESTAMP]");
+  }
+
+  // ignored as tuple elements returns 'null' when accessed in the select statement
+  @Ignore
+  @Test public void testCollectionsInnerValues() {
+    CalciteAssert.that()
+        .with(DTCASSANDRA)
+        .query("select \"f_list\"[1], "
+            + "\"f_map\"['k1'], "
+            + "\"f_tuple\"['1'], "
+            + "\"f_tuple\"['2'], "
+            + "\"f_tuple\"['3']"
+            + " from \"test_collections\"")
+        .returns("EXPR$0=1"
+            + "; EXPR$1=v1"
+            + "; EXPR$2=3000000000"
+            + "; EXPR$3=30ff87"
+            + "; EXPR$4=2015-05-03 13:30:54.234");
+  }
+
+  // frozen collections should not affect the row type
+  @Test public void testFrozenCollectionsRowType() {
+    CalciteAssert.that()
+        .with(DTCASSANDRA)
+        .query("select * from \"test_frozen_collections\"")
+        .typeIs("[f_int INTEGER"
+            + ", f_list INTEGER ARRAY"
+            + ", f_map (VARCHAR, VARCHAR) MAP"
+            + ", f_set DOUBLE MULTISET"
+            + ", f_tuple STRUCT]");
+    // we should test (BIGINT, VARBINARY, TIMESTAMP) STRUCT but inner types are not exposed
+  }
+
+  // frozen collections should not affect the result set
+  @Test public void testFrozenCollectionsValues() {
+    CalciteAssert.that()
+        .with(DTCASSANDRA)
+        .query("select * from \"test_frozen_collections\"")
+        .returns("f_int=0"
+            + "; f_list=[1, 2, 3]"
+            + "; f_map={k1=v1, k2=v2}"
+            + "; f_set=[2.0, 3.1]"
+            + "; f_tuple={3000000000, 30ff87, 2015-05-03 13:30:54.234}"
+            + "\n");
+  }
+}
diff --git a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterTest.java b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterTest.java
index fcbc88c..92f018b 100644
--- a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterTest.java
+++ b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterTest.java
@@ -16,29 +16,13 @@
  */
 package org.apache.calcite.test;
 
-import org.apache.calcite.config.CalciteSystemProperty;
-import org.apache.calcite.util.Bug;
-import org.apache.calcite.util.Sources;
-import org.apache.calcite.util.TestUtil;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-
 import com.google.common.collect.ImmutableMap;
 
-import org.cassandraunit.CassandraCQLUnit;
-import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.jupiter.api.parallel.Execution;
 import org.junit.jupiter.api.parallel.ExecutionMode;
 import org.junit.rules.ExternalResource;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assume.assumeTrue;
 
 /**
  * Tests for the {@code org.apache.calcite.adapter.cassandra} package.
@@ -52,79 +36,17 @@ import static org.junit.Assume.assumeTrue;
  *
  */
 
-// force tests to run sequentially (maven surefire and failsafe are running them in parallel)
-// seems like some of our code is sharing static variables (like Hooks) which causes tests
-// to fail non-deterministically (flaky tests).
 @Execution(ExecutionMode.SAME_THREAD)
-public class CassandraAdapterTest {
+public class CassandraAdapterTest extends AbstractCassandraAdapterTest {
 
   @ClassRule
-  public static final ExternalResource RULE = initCassandraIfEnabled();
+  public static final ExternalResource RULE =
+          initCassandraIfEnabled("twissandra.cql");
 
   /** Connection factory based on the "mongo-zips" model. */
   private static final ImmutableMap<String, String> TWISSANDRA =
-      ImmutableMap.of("model",
-          Sources.of(
-              CassandraAdapterTest.class.getResource("/model.json"))
-              .file().getAbsolutePath());
-
-  /**
-   * Whether to run this test.
-   * <p>Enabled by default, unless explicitly disabled
-   * from command line ({@code -Dcalcite.test.cassandra=false}) or running on incompatible JDK
-   * version (see below).
-   *
-   * <p>As of this wiring Cassandra 4.x is not yet released and we're using 3.x
-   * (which fails on JDK11+). All cassandra tests will be skipped if
-   * running on JDK11+.
-   *
-   * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-9608">CASSANDRA-9608</a>
-   * @return {@code true} if test is compatible with current environment,
-   *         {@code false} otherwise
-   */
-  private static boolean enabled() {
-    final boolean enabled = CalciteSystemProperty.TEST_CASSANDRA.value();
-    Bug.upgrade("remove JDK version check once current adapter supports Cassandra 4.x");
-    final boolean compatibleJdk = TestUtil.getJavaMajorVersion() < 11;
-    return enabled && compatibleJdk;
-  }
-
-  private static ExternalResource initCassandraIfEnabled() {
-    if (!enabled()) {
-      // Return NOP resource (to avoid nulls)
-      return new ExternalResource() {
-        @Override public Statement apply(final Statement base, final Description description) {
-          return super.apply(base, description);
-        }
-      };
-    }
-
-    String configurationFileName = null; // use default one
-    // Apache Jenkins often fails with
-    // CassandraAdapterTest Cassandra daemon did not start within timeout (20 sec by default)
-    long startUpTimeoutMillis = TimeUnit.SECONDS.toMillis(60);
-
-    CassandraCQLUnit rule = new CassandraCQLUnit(
-        new ClassPathCQLDataSet("twissandra.cql"),
-        configurationFileName,
-        startUpTimeoutMillis);
-
-    // This static init is necessary otherwise tests fail with CassandraUnit in IntelliJ (jdk10)
-    // should be called right after constructor
-    // NullPointerException for DatabaseDescriptor.getDiskFailurePolicy
-    // for more info see
-    // https://github.com/jsevellec/cassandra-unit/issues/249
-    // https://github.com/jsevellec/cassandra-unit/issues/221
-    DatabaseDescriptor.daemonInitialization();
-
-    return rule;
-  }
+          getDataset("/model.json");
 
-  @BeforeClass
-  public static void setUp() {
-    // run tests only if explicitly enabled
-    assumeTrue("test explicitly disabled", enabled());
-  }
 
   @Test public void testSelect() {
     CalciteAssert.that()
diff --git a/cassandra/src/test/resources/datatypes.cql b/cassandra/src/test/resources/datatypes.cql
new file mode 100644
index 0000000..045ee08
--- /dev/null
+++ b/cassandra/src/test/resources/datatypes.cql
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE KEYSPACE dtcassandra
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
+
+USE dtcassandra;
+
+CREATE TABLE test_simple (
+    f_int int PRIMARY KEY,
+    f_uuid uuid,
+    f_timeuuid timeuuid,
+    f_ascii ascii,
+    f_text text,
+    f_varchar varchar,
+    f_varint varint,
+    f_bigint bigint,
+    f_double double,
+    f_float float,
+    f_decimal decimal,
+    f_blob blob,
+    f_boolean boolean,
+    f_date date,
+    f_inet inet,
+    f_smallint smallint,
+    f_time time,
+    f_timestamp timestamp,
+    f_tinyint tinyint,
+    f_duration duration
+);
+
+INSERT INTO test_simple(f_int,
+                        f_uuid,
+                        f_timeuuid,
+                        f_ascii,
+                        f_text,
+                        f_varchar,
+                        f_varint,
+                        f_bigint,
+                        f_double,
+                        f_float,
+                        f_decimal,
+                        f_blob,
+                        f_boolean,
+                        f_date,
+                        f_inet,
+                        f_smallint,
+                        f_time,
+                        f_timestamp,
+                        f_tinyint,
+                        f_duration) VALUES (0,
+                                            123e4567-e89b-12d3-a456-426655440000,
+                                            8ac6d1dc-fbeb-11e9-8f0b-362b9e155667,
+                                            'abcdefg',
+                                            'abcdefg',
+                                            'abcdefg',
+                                            10,
+                                            3000000000,
+                                            2.0,
+                                            5.1,
+                                            2.1,
+                                            0x20,
+                                            true,
+                                            '2015-05-03',
+                                            '192.168.0.1',
+                                            5,
+                                            '13:30:54.234',
+                                            '2011-02-03T04:05:00.000+0000',
+                                            0,
+                                            P0000-00-00T89:09:09);
+
+
+CREATE TABLE test_counter ( f_counter counter, f_int int PRIMARY KEY );
+
+UPDATE test_counter SET f_counter = f_counter + 1 WHERE f_int = 1;
+
+
+CREATE TABLE test_collections (
+    f_int int PRIMARY KEY,
+    f_list list<int>,
+    f_map map<text, text>,
+    f_set set<double>,
+    f_tuple tuple<bigint, blob, timestamp>
+);
+
+INSERT INTO test_collections (f_int, f_list, f_map, f_set, f_tuple) VALUES (0,
+                                                                            [1,2,3],
+                                                                            {'k1':'v1', 'k2':'v2'},
+                                                                            {2.0, 3.1},
+                                                                            (3000000000, 0x30FF87, '2015-05-03 13:30:54.234'));
+
+
+CREATE TABLE test_frozen_collections (
+    f_int int PRIMARY KEY,
+    f_list frozen<list<int>>,
+    f_map frozen<map<text, text>>,
+    f_set frozen<set<double>>,
+    f_tuple frozen<tuple<bigint, blob, timestamp>>
+);
+
+INSERT INTO test_frozen_collections (f_int, f_list, f_map, f_set, f_tuple) VALUES (0,
+                                                                                   [1,2,3],
+                                                                                   {'k1':'v1', 'k2':'v2'},
+                                                                                   {2.0, 3.1},
+                                                                                   (3000000000, 0x30FF87, '2015-05-03 13:30:54.234'));
diff --git a/cassandra/src/test/resources/model-datatypes.json b/cassandra/src/test/resources/model-datatypes.json
new file mode 100644
index 0000000..7386122
--- /dev/null
+++ b/cassandra/src/test/resources/model-datatypes.json
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "version": "1.0",
+  "defaultSchema": "dtcassandra",
+  "schemas": [
+    {
+      "name": "dtcassandra",
+      "type": "custom",
+      "factory": "org.apache.calcite.adapter.cassandra.CassandraSchemaFactory",
+      "operand": {
+        "host": "localhost",
+        "port": 9142,
+        "keyspace": "dtcassandra"
+      }
+    }
+  ]
+}