You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by as...@apache.org on 2022/02/18 08:39:01 UTC
[calcite] branch master updated: [CALCITE-5011] CassandraAdapterDataTypesTest fails with initialization error
This is an automated email from the ASF dual-hosted git repository.
asolimando pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new e81cd20 [CALCITE-5011] CassandraAdapterDataTypesTest fails with initialization error
e81cd20 is described below
commit e81cd20bfbf3cdc8fa430663b8f971119193cc3e
Author: Alessandro Solimando <al...@gmail.com>
AuthorDate: Tue Feb 15 15:43:44 2022 +0100
[CALCITE-5011] CassandraAdapterDataTypesTest fails with initialization error
Cache Cassandra sessions based on "hostname, port, keyspace, username, password" information.
---
.../adapter/cassandra/CassandraEnumerator.java | 5 +-
.../calcite/adapter/cassandra/CassandraSchema.java | 78 +++-------------------
.../adapter/cassandra/CassandraSchemaFactory.java | 75 ++++++++++++++++++---
3 files changed, 76 insertions(+), 82 deletions(-)
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
index e99e151..1cf187a 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
@@ -27,6 +27,7 @@ import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.data.TupleValue;
+import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -87,7 +88,7 @@ class CassandraEnumerator implements Enumerator<Object> {
private @Nullable Object currentRowField(int index) {
assert current != null;
final Object o = current.get(index,
- CassandraSchema.CODEC_REGISTRY.codecFor(
+ CodecRegistry.DEFAULT.codecFor(
current.getColumnDefinitions().get(index).getType()));
return convertToEnumeratorObject(o);
@@ -124,7 +125,7 @@ class CassandraEnumerator implements Enumerator<Object> {
return IntStream.range(0, numComponents)
.mapToObj(i ->
tupleValue.get(i,
- CassandraSchema.CODEC_REGISTRY.codecFor(
+ CodecRegistry.DEFAULT.codecFor(
tupleValue.getType().getComponentTypes().get(i)))
).map(this::convertToEnumeratorObject)
.map(Objects::requireNonNull) // "null" cannot appear inside collections
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
index f3a3eff..de774cf 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
@@ -54,13 +54,10 @@ import com.datastax.oss.driver.api.core.type.ListType;
import com.datastax.oss.driver.api.core.type.MapType;
import com.datastax.oss.driver.api.core.type.SetType;
import com.datastax.oss.driver.api.core.type.TupleType;
-import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.google.common.collect.ImmutableMap;
-import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -80,83 +77,24 @@ public class CassandraSchema extends AbstractSchema {
final String name;
final Hook.Closeable hook;
- static final CodecRegistry CODEC_REGISTRY = CodecRegistry.DEFAULT;
static final CqlToSqlTypeConversionRules CQL_TO_SQL_TYPE =
CqlToSqlTypeConversionRules.instance();
protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
- private static final int DEFAULT_CASSANDRA_PORT = 9042;
-
- /**
- * Creates a Cassandra schema.
- *
- * @param host Cassandra host, e.g. "localhost"
- * @param keyspace Cassandra keyspace name, e.g. "twissandra"
- */
- @SuppressWarnings("unused")
- public CassandraSchema(String host, String keyspace, SchemaPlus parentSchema, String name) {
- this(host, DEFAULT_CASSANDRA_PORT, keyspace, null, null, parentSchema, name);
- }
-
/**
* Creates a Cassandra schema.
*
- * @param host Cassandra host, e.g. "localhost"
- * @param port Cassandra port, e.g. 9042
- * @param keyspace Cassandra keyspace name, e.g. "twissandra"
+ * @param session a Cassandra session
+ * @param parentSchema the parent schema
+ * @param name the schema name
*/
- @SuppressWarnings("unused")
- public CassandraSchema(String host, int port, String keyspace,
- SchemaPlus parentSchema, String name) {
- this(host, port, keyspace, null, null, parentSchema, name);
- }
-
- /**
- * Creates a Cassandra schema.
- *
- * @param host Cassandra host, e.g. "localhost"
- * @param keyspace Cassandra keyspace name, e.g. "twissandra"
- * @param username Cassandra username
- * @param password Cassandra password
- */
- public CassandraSchema(String host, String keyspace, @Nullable String username,
- @Nullable String password, SchemaPlus parentSchema, String name) {
- this(host, DEFAULT_CASSANDRA_PORT, keyspace, username, password, parentSchema, name);
- }
-
- /**
- * Creates a Cassandra schema.
- *
- * @param host Cassandra host, e.g. "localhost"
- * @param port Cassandra port, e.g. 9042
- * @param keyspace Cassandra keyspace name, e.g. "twissandra"
- * @param username Cassandra username
- * @param password Cassandra password
- */
- public CassandraSchema(String host, int port, String keyspace, @Nullable String username,
- @Nullable String password, SchemaPlus parentSchema, String name) {
+ public CassandraSchema(CqlSession session, SchemaPlus parentSchema, String name) {
super();
-
- this.keyspace = keyspace;
- try {
- if (username != null && password != null) {
- this.session = CqlSession.builder()
- .addContactPoint(new InetSocketAddress(host, port))
- .withAuthCredentials(username, password)
- .withKeyspace(keyspace)
- .withLocalDatacenter("datacenter1")
- .build();
- } else {
- this.session = CqlSession.builder()
- .addContactPoint(new InetSocketAddress(host, port))
- .withKeyspace(keyspace)
- .withLocalDatacenter("datacenter1")
- .build();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ this.session = session;
+ this.keyspace = session.getKeyspace()
+ .orElseThrow(() -> new RuntimeException("No keyspace for session " + session.getName()))
+ .asInternal();
this.parentSchema = parentSchema;
this.name = name;
this.hook = prepareHook();
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
index bed42e0..8aa796c 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
@@ -19,36 +19,91 @@ package org.apache.calcite.adapter.cassandra;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.trace.CalciteTrace;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.google.common.collect.ImmutableSet;
+
+import org.slf4j.Logger;
+
+import java.net.InetSocketAddress;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
/**
* Factory that creates a {@link CassandraSchema}.
*/
@SuppressWarnings("UnusedDeclaration")
public class CassandraSchemaFactory implements SchemaFactory {
+
+ private static final int DEFAULT_CASSANDRA_PORT = 9042;
+ private static final Map<Map<String, Object>, CqlSession> INFO_TO_SESSION =
+ new ConcurrentHashMap<>();
+ private static final Set<String> SESSION_DEFINING_KEYS = ImmutableSet.of(
+ "host", "port", "keyspace", "username", "password");
+ protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
+
public CassandraSchemaFactory() {
+ super();
}
@Override public Schema create(SchemaPlus parentSchema, String name,
Map<String, Object> operand) {
- Map map = (Map) operand;
- String host = (String) map.get("host");
- String keyspace = (String) map.get("keyspace");
- String username = (String) map.get("username");
- String password = (String) map.get("password");
+ final Map<String, Object> sessionMap = projectMapOverKeys(operand, SESSION_DEFINING_KEYS);
+
+ INFO_TO_SESSION.computeIfAbsent(sessionMap, m -> {
+ String host = (String) m.get("host");
+ String keyspace = (String) m.get("keyspace");
+ String username = (String) m.get("username");
+ String password = (String) m.get("password");
+ int port = getPort(m);
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Creating session for info {}", m);
+ }
+ try {
+ if (username != null && password != null) {
+ return CqlSession.builder()
+ .addContactPoint(new InetSocketAddress(host, port))
+ .withAuthCredentials(username, password)
+ .withKeyspace(keyspace)
+ .withLocalDatacenter("datacenter1")
+ .build();
+ } else {
+ return CqlSession.builder()
+ .addContactPoint(new InetSocketAddress(host, port))
+ .withKeyspace(keyspace)
+ .withLocalDatacenter("datacenter1")
+ .build();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ return new CassandraSchema(INFO_TO_SESSION.get(sessionMap), parentSchema, name);
+ }
+
+ private static Map<String, Object> projectMapOverKeys(
+ Map<String, Object> map, Set<String> keysToKeep) {
+ return map.entrySet().stream()
+ .filter(e -> keysToKeep.contains(e.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ private static int getPort(Map<String, Object> map) {
if (map.containsKey("port")) {
Object portObj = map.get("port");
- int port;
if (portObj instanceof String) {
- port = Integer.parseInt((String) portObj);
+ return Integer.parseInt((String) portObj);
} else {
- port = (int) portObj;
+ return (int) portObj;
}
- return new CassandraSchema(host, port, keyspace, username, password, parentSchema, name);
} else {
- return new CassandraSchema(host, keyspace, username, password, parentSchema, name);
+ return DEFAULT_CASSANDRA_PORT;
}
}
}