You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by mm...@apache.org on 2020/02/06 18:18:57 UTC
[calcite] branch master updated: [CALCITE-3465] Add support for
missing Cassandra 3.x data types (Alessandro Solimando)
This is an automated email from the ASF dual-hosted git repository.
mmior pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new 5d37a0e [CALCITE-3465] Add support for missing Cassandra 3.x data types (Alessandro Solimando)
5d37a0e is described below
commit 5d37a0e990a2b84c6616338490d7451065fdd3e5
Author: Alessandro Solimando <18...@users.noreply.github.com>
AuthorDate: Tue Nov 5 22:23:52 2019 +0100
[CALCITE-3465] Add support for missing Cassandra 3.x data types (Alessandro Solimando)
Closes #1774
---
.../adapter/cassandra/CassandraEnumerator.java | 71 +++++---
.../calcite/adapter/cassandra/CassandraFilter.java | 4 +-
.../calcite/adapter/cassandra/CassandraSchema.java | 98 +++++++---
.../calcite/adapter/cassandra/CassandraTable.java | 19 +-
.../cassandra/CqlToSqlTypeConversionRules.java | 94 ++++++++++
.../calcite/test/AbstractCassandraAdapterTest.java | 110 ++++++++++++
.../test/CassandraAdapterDataTypesTest.java | 199 +++++++++++++++++++++
.../apache/calcite/test/CassandraAdapterTest.java | 86 +--------
cassandra/src/test/resources/datatypes.cql | 118 ++++++++++++
cassandra/src/test/resources/model-datatypes.json | 32 ++++
10 files changed, 693 insertions(+), 138 deletions(-)
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
index 96ca916..0413819 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
@@ -16,20 +16,26 @@
*/
package org.apache.calcite.adapter.cassandra;
+import org.apache.calcite.avatica.util.ByteString;
+import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
-import org.apache.calcite.sql.type.SqlTypeName;
-import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.LocalDate;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
+import com.datastax.driver.core.TupleValue;
+import java.nio.ByteBuffer;
+import java.util.Date;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
+import java.util.stream.IntStream;
/** Enumerator that reads from a Cassandra column family. */
class CassandraEnumerator implements Enumerator<Object> {
@@ -58,12 +64,12 @@ class CassandraEnumerator implements Enumerator<Object> {
public Object current() {
if (fieldTypes.size() == 1) {
// If we just have one field, produce it directly
- return currentRowField(0, fieldTypes.get(0).getType().getSqlTypeName());
+ return currentRowField(0);
} else {
// Build an array with all fields in this row
Object[] row = new Object[fieldTypes.size()];
for (int i = 0; i < fieldTypes.size(); i++) {
- row[i] = currentRowField(i, fieldTypes.get(i).getType().getSqlTypeName());
+ row[i] = currentRowField(i);
}
return row;
@@ -73,25 +79,48 @@ class CassandraEnumerator implements Enumerator<Object> {
/** Get a field for the current row from the underlying object.
*
* @param index Index of the field within the Row object
- * @param typeName Type of the field in this row
*/
- private Object currentRowField(int index, SqlTypeName typeName) {
- DataType type = current.getColumnDefinitions().getType(index);
- if (type == DataType.ascii() || type == DataType.text() || type == DataType.varchar()) {
- return current.getString(index);
- } else if (type == DataType.cint() || type == DataType.varint()) {
- return current.getInt(index);
- } else if (type == DataType.bigint()) {
- return current.getLong(index);
- } else if (type == DataType.cdouble()) {
- return current.getDouble(index);
- } else if (type == DataType.cfloat()) {
- return current.getFloat(index);
- } else if (type == DataType.uuid() || type == DataType.timeuuid()) {
- return current.getUUID(index).toString();
- } else {
- return null;
+ private Object currentRowField(int index) {
+ final Object o = current.get(index,
+ CassandraSchema.CODEC_REGISTRY.codecFor(
+ current.getColumnDefinitions().getType(index)));
+
+ return convertToEnumeratorObject(o);
+ }
+
+ /** Convert an object into the expected internal representation.
+ *
+ * @param obj Object to convert, if needed
+ */
+ private Object convertToEnumeratorObject(Object obj) {
+ if (obj instanceof ByteBuffer) {
+ ByteBuffer buf = (ByteBuffer) obj;
+ byte [] bytes = new byte[buf.remaining()];
+ buf.get(bytes, 0, bytes.length);
+ return new ByteString(bytes);
+ } else if (obj instanceof LocalDate) {
+ // converts dates to the expected numeric format
+ return ((LocalDate) obj).getMillisSinceEpoch()
+ / DateTimeUtils.MILLIS_PER_DAY;
+ } else if (obj instanceof Date) {
+ return ((Date) obj).toInstant().toEpochMilli();
+ } else if (obj instanceof LinkedHashSet) {
+ // MULTISET is handled as an array
+ return ((LinkedHashSet) obj).toArray();
+ } else if (obj instanceof TupleValue) {
+ // STRUCT can be handled as an array
+ final TupleValue tupleValue = (TupleValue) obj;
+ int numComponents = tupleValue.getType().getComponentTypes().size();
+ return IntStream.range(0, numComponents)
+ .mapToObj(i ->
+ tupleValue.get(i,
+ CassandraSchema.CODEC_REGISTRY.codecFor(
+ tupleValue.getType().getComponentTypes().get(i)))
+ ).map(this::convertToEnumeratorObject)
+ .toArray();
}
+
+ return obj;
}
public boolean moveNext() {
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java
index 2d80426..31bd014 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraFilter.java
@@ -181,9 +181,7 @@ public class CassandraFilter extends Filter implements CassandraRel {
*/
private static String literalValue(RexLiteral literal) {
Object value = literal.getValue2();
- StringBuilder buf = new StringBuilder();
- buf.append(value);
- return buf.toString();
+ return String.valueOf(value);
}
/** Translate a conjunctive predicate to a CQL string.
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
index 85c4c6d..b0a2458 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
@@ -19,6 +19,7 @@ package org.apache.calcite.adapter.cassandra;
import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
@@ -43,12 +44,14 @@ import org.apache.calcite.util.trace.CalciteTrace;
import com.datastax.driver.core.AbstractTableMetadata;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ClusteringOrder;
+import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.MaterializedViewMetadata;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.TupleType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -58,6 +61,8 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* Schema mapped onto a Cassandra column family
@@ -69,6 +74,10 @@ public class CassandraSchema extends AbstractSchema {
final String name;
final Hook.Closeable hook;
+ static final CodecRegistry CODEC_REGISTRY = CodecRegistry.DEFAULT_INSTANCE;
+ static final CqlToSqlTypeConversionRules CQL_TO_SQL_TYPE =
+ CqlToSqlTypeConversionRules.instance();
+
protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
private static final int DEFAULT_CASSANDRA_PORT = 9042;
@@ -160,28 +169,68 @@ public class CassandraSchema extends AbstractSchema {
new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
for (ColumnMetadata column : columns) {
- final String columnName = column.getName();
- final DataType type = column.getType();
-
- // TODO: This mapping of types can be done much better
- SqlTypeName typeName = SqlTypeName.ANY;
- if (type == DataType.uuid() || type == DataType.timeuuid()) {
- // We currently rely on this in CassandraFilter to detect UUID columns.
- // That is, these fixed length literals should be unquoted in CQL.
- typeName = SqlTypeName.CHAR;
- } else if (type == DataType.ascii() || type == DataType.text()
- || type == DataType.varchar()) {
- typeName = SqlTypeName.VARCHAR;
- } else if (type == DataType.cint() || type == DataType.varint()) {
- typeName = SqlTypeName.INTEGER;
- } else if (type == DataType.bigint()) {
- typeName = SqlTypeName.BIGINT;
- } else if (type == DataType.cdouble() || type == DataType.cfloat()
- || type == DataType.decimal()) {
- typeName = SqlTypeName.DOUBLE;
- }
+ final SqlTypeName typeName =
+ CQL_TO_SQL_TYPE.lookup(column.getType().getName());
+
+ switch (typeName) {
+ case ARRAY:
+ final SqlTypeName arrayInnerType = CQL_TO_SQL_TYPE.lookup(
+ column.getType().getTypeArguments().get(0).getName());
+
+ fieldInfo.add(column.getName(),
+ typeFactory.createArrayType(
+ typeFactory.createSqlType(arrayInnerType), -1))
+ .nullable(true);
+
+ break;
+ case MULTISET:
+ final SqlTypeName multiSetInnerType = CQL_TO_SQL_TYPE.lookup(
+ column.getType().getTypeArguments().get(0).getName());
+
+ fieldInfo.add(column.getName(),
+ typeFactory.createMultisetType(
+ typeFactory.createSqlType(multiSetInnerType), -1)
+ ).nullable(true);
+
+ break;
+ case MAP:
+ final List<DataType> types = column.getType().getTypeArguments();
+ final SqlTypeName keyType =
+ CQL_TO_SQL_TYPE.lookup(types.get(0).getName());
+ final SqlTypeName valueType =
+ CQL_TO_SQL_TYPE.lookup(types.get(1).getName());
+
+ fieldInfo.add(column.getName(),
+ typeFactory.createMapType(
+ typeFactory.createSqlType(keyType),
+ typeFactory.createSqlType(valueType))
+ ).nullable(true);
+
+ break;
+ case STRUCTURED:
+ assert DataType.Name.TUPLE == column.getType().getName();
+
+ final List<DataType> typeArgs =
+ ((TupleType) column.getType()).getComponentTypes();
+ final List<Map.Entry<String, RelDataType>> typesList =
+ IntStream.range(0, typeArgs.size())
+ .mapToObj(
+ i -> new Pair<>(
+ Integer.toString(i + 1), // 1 indexed (as ARRAY)
+ typeFactory.createSqlType(
+ CQL_TO_SQL_TYPE.lookup(typeArgs.get(i).getName()))))
+ .collect(Collectors.toList());
+
+ fieldInfo.add(column.getName(),
+ typeFactory.createStructType(typesList))
+ .nullable(true);
+
+ break;
+ default:
+ fieldInfo.add(column.getName(), typeName).nullable(true);
- fieldInfo.add(columnName, typeFactory.createSqlType(typeName)).nullable(true);
+ break;
+ }
}
return RelDataTypeImpl.proto(fieldInfo.build());
@@ -267,12 +316,15 @@ public class CassandraSchema extends AbstractSchema {
}
queryBuilder.append(Util.toString(columnNames, "", ", ", ""));
- queryBuilder.append(" FROM \"" + tableName + "\"");
+ queryBuilder.append(" FROM \"")
+ .append(tableName)
+ .append("\"");
// Get the where clause from the system schema
String whereQuery = "SELECT where_clause from system_schema.views "
+ "WHERE keyspace_name='" + keyspace + "' AND view_name='" + view.getName() + "'";
- queryBuilder.append(" WHERE " + session.execute(whereQuery).one().getString(0));
+ queryBuilder.append(" WHERE ")
+ .append(session.execute(whereQuery).one().getString(0));
// Parse and unparse the view query to get properly quoted field names
String query = queryBuilder.toString();
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
index fa7d0b1..8b4112a 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraTable.java
@@ -36,7 +36,6 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTableQueryable;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
@@ -118,10 +117,9 @@ public class CassandraTable extends AbstractQueryableTable
final RelDataType rowType = getRowType(typeFactory);
Function1<String, Void> addField = fieldName -> {
- SqlTypeName typeName =
- rowType.getField(fieldName, true, false).getType().getSqlTypeName();
- fieldInfo.add(fieldName, typeFactory.createSqlType(typeName))
- .nullable(true);
+ RelDataType relDataType =
+ rowType.getField(fieldName, true, false).getType();
+ fieldInfo.add(fieldName, relDataType).nullable(true);
return null;
};
@@ -172,9 +170,11 @@ public class CassandraTable extends AbstractQueryableTable
// Build and issue the query and return an Enumerator over the results
StringBuilder queryBuilder = new StringBuilder("SELECT ");
- queryBuilder.append(selectString);
- queryBuilder.append(" FROM \"" + columnFamily + "\"");
- queryBuilder.append(whereClause);
+ queryBuilder.append(selectString)
+ .append(" FROM \"")
+ .append(columnFamily)
+ .append("\"")
+ .append(whereClause);
if (!order.isEmpty()) {
queryBuilder.append(Util.toString(order, " ORDER BY ", ", ", ""));
}
@@ -184,7 +184,8 @@ public class CassandraTable extends AbstractQueryableTable
limit += fetch;
}
if (limit > 0) {
- queryBuilder.append(" LIMIT " + limit);
+ queryBuilder.append(" LIMIT ")
+ .append(limit);
}
queryBuilder.append(" ALLOW FILTERING");
final String query = queryBuilder.toString();
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CqlToSqlTypeConversionRules.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CqlToSqlTypeConversionRules.java
new file mode 100644
index 0000000..b55683b
--- /dev/null
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CqlToSqlTypeConversionRules.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.cassandra;
+
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import com.datastax.driver.core.DataType;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+/**
+ * CqlToSqlTypeConversionRules defines mappings from CQL types to
+ * corresponding SQL types.
+ */
+public class CqlToSqlTypeConversionRules {
+ //~ Static fields/initializers ---------------------------------------------
+
+ private static final CqlToSqlTypeConversionRules INSTANCE =
+ new CqlToSqlTypeConversionRules();
+
+ //~ Instance fields --------------------------------------------------------
+
+ private final Map<DataType.Name, SqlTypeName> rules =
+ ImmutableMap.<DataType.Name, SqlTypeName>builder()
+ .put(DataType.Name.UUID, SqlTypeName.CHAR)
+ .put(DataType.Name.TIMEUUID, SqlTypeName.CHAR)
+
+ .put(DataType.Name.ASCII, SqlTypeName.VARCHAR)
+ .put(DataType.Name.TEXT, SqlTypeName.VARCHAR)
+ .put(DataType.Name.VARCHAR, SqlTypeName.VARCHAR)
+
+ .put(DataType.Name.INT, SqlTypeName.INTEGER)
+ .put(DataType.Name.VARINT, SqlTypeName.INTEGER)
+ .put(DataType.Name.BIGINT, SqlTypeName.BIGINT)
+ .put(DataType.Name.TINYINT, SqlTypeName.TINYINT)
+ .put(DataType.Name.SMALLINT, SqlTypeName.SMALLINT)
+
+ .put(DataType.Name.DOUBLE, SqlTypeName.DOUBLE)
+ .put(DataType.Name.FLOAT, SqlTypeName.REAL)
+ .put(DataType.Name.DECIMAL, SqlTypeName.DOUBLE)
+
+ .put(DataType.Name.BLOB, SqlTypeName.VARBINARY)
+
+ .put(DataType.Name.BOOLEAN, SqlTypeName.BOOLEAN)
+
+ .put(DataType.Name.COUNTER, SqlTypeName.BIGINT)
+
+ // number of nanoseconds since midnight
+ .put(DataType.Name.TIME, SqlTypeName.BIGINT)
+ .put(DataType.Name.DATE, SqlTypeName.DATE)
+ .put(DataType.Name.TIMESTAMP, SqlTypeName.TIMESTAMP)
+
+ .put(DataType.Name.MAP, SqlTypeName.MAP)
+ .put(DataType.Name.LIST, SqlTypeName.ARRAY)
+ .put(DataType.Name.SET, SqlTypeName.MULTISET)
+ .put(DataType.Name.TUPLE, SqlTypeName.STRUCTURED)
+ .build();
+
+ //~ Methods ----------------------------------------------------------------
+
+ /**
+ * Returns the
+ * {@link org.apache.calcite.util.Glossary#SINGLETON_PATTERN singleton}
+ * instance.
+ */
+ public static CqlToSqlTypeConversionRules instance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Returns a corresponding {@link SqlTypeName} for a given CQL type name.
+ *
+ * @param name the CQL type name to lookup
+ * @return a corresponding SqlTypeName if found, ANY otherwise
+ */
+ public SqlTypeName lookup(DataType.Name name) {
+ return rules.getOrDefault(name, SqlTypeName.ANY);
+ }
+}
diff --git a/cassandra/src/test/java/org/apache/calcite/test/AbstractCassandraAdapterTest.java b/cassandra/src/test/java/org/apache/calcite/test/AbstractCassandraAdapterTest.java
new file mode 100644
index 0000000..ea55f2d
--- /dev/null
+++ b/cassandra/src/test/java/org/apache/calcite/test/AbstractCassandraAdapterTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test;
+
+import org.apache.calcite.config.CalciteSystemProperty;
+import org.apache.calcite.util.Bug;
+import org.apache.calcite.util.Sources;
+import org.apache.calcite.util.TestUtil;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.cassandraunit.CassandraCQLUnit;
+import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
+import org.junit.BeforeClass;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for the {@code org.apache.calcite.adapter.cassandra} package.
+ *
+ * Common initialization functions for Cassandra tests.
+ */
+public abstract class AbstractCassandraAdapterTest {
+
+ /** Connection factory based on the "mongo-zips" model. */
+ static ImmutableMap<String, String> getDataset(String resourcePath) {
+ return ImmutableMap.of("model",
+ Sources.of(CassandraAdapterTest.class.getResource(resourcePath))
+ .file().getAbsolutePath());
+ }
+
+ /**
+ * Whether to run this test.
+ * <p>Enabled by default, unless explicitly disabled
+ * from command line ({@code -Dcalcite.test.cassandra=false}) or running on incompatible JDK
+ * version (see below).
+ *
+ * <p>As of this wiring Cassandra 4.x is not yet released and we're using 3.x
+ * (which fails on JDK11+). All cassandra tests will be skipped if
+ * running on JDK11+.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-9608">CASSANDRA-9608</a>
+ * @return {@code true} if test is compatible with current environment,
+ * {@code false} otherwise
+ */
+ private static boolean enabled() {
+ final boolean enabled = CalciteSystemProperty.TEST_CASSANDRA.value();
+ Bug.upgrade("remove JDK version check once current adapter supports Cassandra 4.x");
+ final boolean compatibleJdk = TestUtil.getJavaMajorVersion() < 11;
+ return enabled && compatibleJdk;
+ }
+
+ static ExternalResource initCassandraIfEnabled(String dataSetLocation) {
+ if (!enabled()) {
+ // Return NOP resource (to avoid nulls)
+ return new ExternalResource() {
+ @Override public Statement apply(final Statement base, final Description description) {
+ return super.apply(base, description);
+ }
+ };
+ }
+
+ String configurationFileName = null; // use default one
+ // Apache Jenkins often fails with
+ // CassandraAdapterTest Cassandra daemon did not start within timeout (20 sec by default)
+ long startUpTimeoutMillis = TimeUnit.SECONDS.toMillis(60);
+
+ CassandraCQLUnit rule = new CassandraCQLUnit(
+ new ClassPathCQLDataSet(dataSetLocation),
+ configurationFileName,
+ startUpTimeoutMillis);
+
+ // This static init is necessary otherwise tests fail with CassandraUnit in IntelliJ (jdk10)
+ // should be called right after constructor
+ // NullPointerException for DatabaseDescriptor.getDiskFailurePolicy
+ // for more info see
+ // https://github.com/jsevellec/cassandra-unit/issues/249
+ // https://github.com/jsevellec/cassandra-unit/issues/221
+ DatabaseDescriptor.daemonInitialization();
+
+ return rule;
+ }
+
+ @BeforeClass
+ public static void setUp() {
+ // run tests only if explicitly enabled
+ assumeTrue("test explicitly disabled", enabled());
+ }
+}
diff --git a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterDataTypesTest.java b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterDataTypesTest.java
new file mode 100644
index 0000000..f33ce42
--- /dev/null
+++ b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterDataTypesTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.junit.rules.ExternalResource;
+
+/**
+ * Tests for the {@code org.apache.calcite.adapter.cassandra} package related to data types.
+ *
+ * <p>Will start embedded cassandra cluster and populate it from local {@code datatypes.cql} file.
+ * All configuration files are located in test classpath.
+ *
+ * <p>Note that tests will be skipped if running on JDK11+
+ * (which is not yet supported by cassandra) see
+ * <a href="https://issues.apache.org/jira/browse/CASSANDRA-9608">CASSANDRA-9608</a>.
+ *
+ */
+
+@Execution(ExecutionMode.SAME_THREAD)
+public class CassandraAdapterDataTypesTest extends AbstractCassandraAdapterTest {
+
+ @ClassRule
+ public static final ExternalResource RULE =
+ initCassandraIfEnabled("datatypes.cql");
+
+ /** Connection factory based on the "mongo-zips" model. */
+ private static final ImmutableMap<String, String> DTCASSANDRA =
+ getDataset("/model-datatypes.json");
+
+ @Test public void testSimpleTypesRowType() {
+ CalciteAssert.that()
+ .with(DTCASSANDRA)
+ .query("select * from \"test_simple\"")
+ .typeIs("[f_int INTEGER"
+ + ", f_ascii VARCHAR"
+ + ", f_bigint BIGINT"
+ + ", f_blob VARBINARY"
+ + ", f_boolean BOOLEAN"
+ + ", f_date DATE"
+ + ", f_decimal DOUBLE"
+ + ", f_double DOUBLE"
+ + ", f_duration ANY"
+ + ", f_float REAL"
+ + ", f_inet ANY"
+ + ", f_smallint SMALLINT"
+ + ", f_text VARCHAR"
+ + ", f_time BIGINT"
+ + ", f_timestamp TIMESTAMP"
+ + ", f_timeuuid CHAR"
+ + ", f_tinyint TINYINT"
+ + ", f_uuid CHAR"
+ + ", f_varchar VARCHAR"
+ + ", f_varint INTEGER]");
+ }
+
+ @Test public void testSimpleTypesValues() {
+ CalciteAssert.that()
+ .with(DTCASSANDRA)
+ .query("select * from \"test_simple\"")
+ .returns("f_int=0"
+ + "; f_ascii=abcdefg"
+ + "; f_bigint=3000000000"
+ + "; f_blob=20"
+ + "; f_boolean=true"
+ + "; f_date=2015-05-03"
+ + "; f_decimal=2.1"
+ + "; f_double=2.0"
+ + "; f_duration=89h9m9s"
+ + "; f_float=5.1"
+ + "; f_inet=/192.168.0.1"
+ + "; f_smallint=5"
+ + "; f_text=abcdefg"
+ + "; f_time=48654234000000"
+ + "; f_timestamp=2011-02-03 04:05:00"
+ + "; f_timeuuid=8ac6d1dc-fbeb-11e9-8f0b-362b9e155667"
+ + "; f_tinyint=0"
+ + "; f_uuid=123e4567-e89b-12d3-a456-426655440000"
+ + "; f_varchar=abcdefg"
+ + "; f_varint=10\n");
+ }
+
+ @Test public void testCounterRowType() {
+ CalciteAssert.that()
+ .with(DTCASSANDRA)
+ .query("select * from \"test_counter\"")
+ .typeIs("[f_int INTEGER, f_counter BIGINT]");
+ }
+
+ @Test public void testCounterValues() {
+ CalciteAssert.that()
+ .with(DTCASSANDRA)
+ .query("select * from \"test_counter\"")
+ .returns("f_int=1; f_counter=1\n");
+ }
+
+ @Test public void testCollectionsRowType() {
+ CalciteAssert.that()
+ .with(DTCASSANDRA)
+ .query("select * from \"test_collections\"")
+ .typeIs("[f_int INTEGER"
+ + ", f_list INTEGER ARRAY"
+ + ", f_map (VARCHAR, VARCHAR) MAP"
+ + ", f_set DOUBLE MULTISET"
+ + ", f_tuple STRUCT]");
+ }
+
+ @Test public void testCollectionsValues() {
+ CalciteAssert.that()
+ .with(DTCASSANDRA)
+ .query("select * from \"test_collections\"")
+ .returns("f_int=0"
+ + "; f_list=[1, 2, 3]"
+ + "; f_map={k1=v1, k2=v2}"
+ + "; f_set=[2.0, 3.1]"
+ + "; f_tuple={3000000000, 30ff87, 2015-05-03 13:30:54.234}"
+ + "\n");
+ }
+
+ @Test public void testCollectionsInnerRowType() {
+ CalciteAssert.that()
+ .with(DTCASSANDRA)
+ .query("select \"f_list\"[1], "
+ + "\"f_map\"['k1'], "
+ + "\"f_tuple\"['1'], "
+ + "\"f_tuple\"['2'], "
+ + "\"f_tuple\"['3']"
+ + " from \"test_collections\"")
+ .typeIs("[EXPR$0 INTEGER"
+ + ", EXPR$1 VARCHAR"
+ + ", EXPR$2 BIGINT"
+ + ", EXPR$3 VARBINARY"
+ + ", EXPR$4 TIMESTAMP]");
+ }
+
+ // ignored as tuple elements returns 'null' when accessed in the select statement
+ @Ignore
+ @Test public void testCollectionsInnerValues() {
+ CalciteAssert.that()
+ .with(DTCASSANDRA)
+ .query("select \"f_list\"[1], "
+ + "\"f_map\"['k1'], "
+ + "\"f_tuple\"['1'], "
+ + "\"f_tuple\"['2'], "
+ + "\"f_tuple\"['3']"
+ + " from \"test_collections\"")
+ .returns("EXPR$0=1"
+ + "; EXPR$1=v1"
+ + "; EXPR$2=3000000000"
+ + "; EXPR$3=30ff87"
+ + "; EXPR$4=2015-05-03 13:30:54.234");
+ }
+
+ // frozen collections should not affect the row type
+ @Test public void testFrozenCollectionsRowType() {
+ CalciteAssert.that()
+ .with(DTCASSANDRA)
+ .query("select * from \"test_frozen_collections\"")
+ .typeIs("[f_int INTEGER"
+ + ", f_list INTEGER ARRAY"
+ + ", f_map (VARCHAR, VARCHAR) MAP"
+ + ", f_set DOUBLE MULTISET"
+ + ", f_tuple STRUCT]");
+ // we should test (BIGINT, VARBINARY, TIMESTAMP) STRUCT but inner types are not exposed
+ }
+
+ // frozen collections should not affect the result set
+ @Test public void testFrozenCollectionsValues() {
+ CalciteAssert.that()
+ .with(DTCASSANDRA)
+ .query("select * from \"test_frozen_collections\"")
+ .returns("f_int=0"
+ + "; f_list=[1, 2, 3]"
+ + "; f_map={k1=v1, k2=v2}"
+ + "; f_set=[2.0, 3.1]"
+ + "; f_tuple={3000000000, 30ff87, 2015-05-03 13:30:54.234}"
+ + "\n");
+ }
+}
diff --git a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterTest.java b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterTest.java
index fcbc88c..92f018b 100644
--- a/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterTest.java
+++ b/cassandra/src/test/java/org/apache/calcite/test/CassandraAdapterTest.java
@@ -16,29 +16,13 @@
*/
package org.apache.calcite.test;
-import org.apache.calcite.config.CalciteSystemProperty;
-import org.apache.calcite.util.Bug;
-import org.apache.calcite.util.Sources;
-import org.apache.calcite.util.TestUtil;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-
import com.google.common.collect.ImmutableMap;
-import org.cassandraunit.CassandraCQLUnit;
-import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.rules.ExternalResource;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assume.assumeTrue;
/**
* Tests for the {@code org.apache.calcite.adapter.cassandra} package.
@@ -52,79 +36,17 @@ import static org.junit.Assume.assumeTrue;
*
*/
-// force tests to run sequentially (maven surefire and failsafe are running them in parallel)
-// seems like some of our code is sharing static variables (like Hooks) which causes tests
-// to fail non-deterministically (flaky tests).
@Execution(ExecutionMode.SAME_THREAD)
-public class CassandraAdapterTest {
+public class CassandraAdapterTest extends AbstractCassandraAdapterTest {
@ClassRule
- public static final ExternalResource RULE = initCassandraIfEnabled();
+ public static final ExternalResource RULE =
+ initCassandraIfEnabled("twissandra.cql");
/** Connection factory based on the "mongo-zips" model. */
private static final ImmutableMap<String, String> TWISSANDRA =
- ImmutableMap.of("model",
- Sources.of(
- CassandraAdapterTest.class.getResource("/model.json"))
- .file().getAbsolutePath());
-
- /**
- * Whether to run this test.
- * <p>Enabled by default, unless explicitly disabled
- * from command line ({@code -Dcalcite.test.cassandra=false}) or running on incompatible JDK
- * version (see below).
- *
- * <p>As of this wiring Cassandra 4.x is not yet released and we're using 3.x
- * (which fails on JDK11+). All cassandra tests will be skipped if
- * running on JDK11+.
- *
- * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-9608">CASSANDRA-9608</a>
- * @return {@code true} if test is compatible with current environment,
- * {@code false} otherwise
- */
- private static boolean enabled() {
- final boolean enabled = CalciteSystemProperty.TEST_CASSANDRA.value();
- Bug.upgrade("remove JDK version check once current adapter supports Cassandra 4.x");
- final boolean compatibleJdk = TestUtil.getJavaMajorVersion() < 11;
- return enabled && compatibleJdk;
- }
-
- private static ExternalResource initCassandraIfEnabled() {
- if (!enabled()) {
- // Return NOP resource (to avoid nulls)
- return new ExternalResource() {
- @Override public Statement apply(final Statement base, final Description description) {
- return super.apply(base, description);
- }
- };
- }
-
- String configurationFileName = null; // use default one
- // Apache Jenkins often fails with
- // CassandraAdapterTest Cassandra daemon did not start within timeout (20 sec by default)
- long startUpTimeoutMillis = TimeUnit.SECONDS.toMillis(60);
-
- CassandraCQLUnit rule = new CassandraCQLUnit(
- new ClassPathCQLDataSet("twissandra.cql"),
- configurationFileName,
- startUpTimeoutMillis);
-
- // This static init is necessary otherwise tests fail with CassandraUnit in IntelliJ (jdk10)
- // should be called right after constructor
- // NullPointerException for DatabaseDescriptor.getDiskFailurePolicy
- // for more info see
- // https://github.com/jsevellec/cassandra-unit/issues/249
- // https://github.com/jsevellec/cassandra-unit/issues/221
- DatabaseDescriptor.daemonInitialization();
-
- return rule;
- }
+ getDataset("/model.json");
- @BeforeClass
- public static void setUp() {
- // run tests only if explicitly enabled
- assumeTrue("test explicitly disabled", enabled());
- }
@Test public void testSelect() {
CalciteAssert.that()
diff --git a/cassandra/src/test/resources/datatypes.cql b/cassandra/src/test/resources/datatypes.cql
new file mode 100644
index 0000000..045ee08
--- /dev/null
+++ b/cassandra/src/test/resources/datatypes.cql
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE KEYSPACE dtcassandra
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
+
+USE dtcassandra;
+
+CREATE TABLE test_simple (
+ f_int int PRIMARY KEY,
+ f_uuid uuid,
+ f_timeuuid timeuuid,
+ f_ascii ascii,
+ f_text text,
+ f_varchar varchar,
+ f_varint varint,
+ f_bigint bigint,
+ f_double double,
+ f_float float,
+ f_decimal decimal,
+ f_blob blob,
+ f_boolean boolean,
+ f_date date,
+ f_inet inet,
+ f_smallint smallint,
+ f_time time,
+ f_timestamp timestamp,
+ f_tinyint tinyint,
+ f_duration duration
+);
+
+INSERT INTO test_simple(f_int,
+ f_uuid,
+ f_timeuuid,
+ f_ascii,
+ f_text,
+ f_varchar,
+ f_varint,
+ f_bigint,
+ f_double,
+ f_float,
+ f_decimal,
+ f_blob,
+ f_boolean,
+ f_date,
+ f_inet,
+ f_smallint,
+ f_time,
+ f_timestamp,
+ f_tinyint,
+ f_duration) VALUES (0,
+ 123e4567-e89b-12d3-a456-426655440000,
+ 8ac6d1dc-fbeb-11e9-8f0b-362b9e155667,
+ 'abcdefg',
+ 'abcdefg',
+ 'abcdefg',
+ 10,
+ 3000000000,
+ 2.0,
+ 5.1,
+ 2.1,
+ 0x20,
+ true,
+ '2015-05-03',
+ '192.168.0.1',
+ 5,
+ '13:30:54.234',
+ '2011-02-03T04:05:00.000+0000',
+ 0,
+ P0000-00-00T89:09:09);
+
+
+CREATE TABLE test_counter ( f_counter counter, f_int int PRIMARY KEY );
+
+UPDATE test_counter SET f_counter = f_counter + 1 WHERE f_int = 1;
+
+
+CREATE TABLE test_collections (
+ f_int int PRIMARY KEY,
+ f_list list<int>,
+ f_map map<text, text>,
+ f_set set<double>,
+ f_tuple tuple<bigint, blob, timestamp>
+);
+
+INSERT INTO test_collections (f_int, f_list, f_map, f_set, f_tuple) VALUES (0,
+ [1,2,3],
+ {'k1':'v1', 'k2':'v2'},
+ {2.0, 3.1},
+ (3000000000, 0x30FF87, '2015-05-03 13:30:54.234'));
+
+
+CREATE TABLE test_frozen_collections (
+ f_int int PRIMARY KEY,
+ f_list frozen<list<int>>,
+ f_map frozen<map<text, text>>,
+ f_set frozen<set<double>>,
+ f_tuple frozen<tuple<bigint, blob, timestamp>>
+);
+
+INSERT INTO test_frozen_collections (f_int, f_list, f_map, f_set, f_tuple) VALUES (0,
+ [1,2,3],
+ {'k1':'v1', 'k2':'v2'},
+ {2.0, 3.1},
+ (3000000000, 0x30FF87, '2015-05-03 13:30:54.234'));
diff --git a/cassandra/src/test/resources/model-datatypes.json b/cassandra/src/test/resources/model-datatypes.json
new file mode 100644
index 0000000..7386122
--- /dev/null
+++ b/cassandra/src/test/resources/model-datatypes.json
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "version": "1.0",
+ "defaultSchema": "dtcassandra",
+ "schemas": [
+ {
+ "name": "dtcassandra",
+ "type": "custom",
+ "factory": "org.apache.calcite.adapter.cassandra.CassandraSchemaFactory",
+ "operand": {
+ "host": "localhost",
+ "port": 9142,
+ "keyspace": "dtcassandra"
+ }
+ }
+ ]
+}