You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by as...@apache.org on 2022/02/18 08:39:01 UTC

[calcite] branch master updated: [CALCITE-5011] CassandraAdapterDataTypesTest fails with initialization error

This is an automated email from the ASF dual-hosted git repository.

asolimando pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new e81cd20  [CALCITE-5011] CassandraAdapterDataTypesTest fails with initialization error
e81cd20 is described below

commit e81cd20bfbf3cdc8fa430663b8f971119193cc3e
Author: Alessandro Solimando <al...@gmail.com>
AuthorDate: Tue Feb 15 15:43:44 2022 +0100

    [CALCITE-5011] CassandraAdapterDataTypesTest fails with initialization error
    
    Cache Cassandra sessions based on "hostname, port, keyspace, username, password" information.
---
 .../adapter/cassandra/CassandraEnumerator.java     |  5 +-
 .../calcite/adapter/cassandra/CassandraSchema.java | 78 +++-------------------
 .../adapter/cassandra/CassandraSchemaFactory.java  | 75 ++++++++++++++++++---
 3 files changed, 76 insertions(+), 82 deletions(-)

diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
index e99e151..1cf187a 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraEnumerator.java
@@ -27,6 +27,7 @@ import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
 import com.datastax.oss.driver.api.core.cql.ResultSet;
 import com.datastax.oss.driver.api.core.cql.Row;
 import com.datastax.oss.driver.api.core.data.TupleValue;
+import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -87,7 +88,7 @@ class CassandraEnumerator implements Enumerator<Object> {
   private @Nullable Object currentRowField(int index) {
     assert current != null;
     final Object o =  current.get(index,
-        CassandraSchema.CODEC_REGISTRY.codecFor(
+        CodecRegistry.DEFAULT.codecFor(
             current.getColumnDefinitions().get(index).getType()));
 
     return convertToEnumeratorObject(o);
@@ -124,7 +125,7 @@ class CassandraEnumerator implements Enumerator<Object> {
       return IntStream.range(0, numComponents)
           .mapToObj(i ->
               tupleValue.get(i,
-                  CassandraSchema.CODEC_REGISTRY.codecFor(
+                  CodecRegistry.DEFAULT.codecFor(
                       tupleValue.getType().getComponentTypes().get(i)))
           ).map(this::convertToEnumeratorObject)
           .map(Objects::requireNonNull) // "null" cannot appear inside collections
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
index f3a3eff..de774cf 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchema.java
@@ -54,13 +54,10 @@ import com.datastax.oss.driver.api.core.type.ListType;
 import com.datastax.oss.driver.api.core.type.MapType;
 import com.datastax.oss.driver.api.core.type.SetType;
 import com.datastax.oss.driver.api.core.type.TupleType;
-import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
 import com.google.common.collect.ImmutableMap;
 
-import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -80,83 +77,24 @@ public class CassandraSchema extends AbstractSchema {
   final String name;
   final Hook.Closeable hook;
 
-  static final CodecRegistry CODEC_REGISTRY = CodecRegistry.DEFAULT;
   static final CqlToSqlTypeConversionRules CQL_TO_SQL_TYPE =
       CqlToSqlTypeConversionRules.instance();
 
   protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
 
-  private static final int DEFAULT_CASSANDRA_PORT = 9042;
-
-  /**
-   * Creates a Cassandra schema.
-   *
-   * @param host Cassandra host, e.g. "localhost"
-   * @param keyspace Cassandra keyspace name, e.g. "twissandra"
-   */
-  @SuppressWarnings("unused")
-  public CassandraSchema(String host, String keyspace, SchemaPlus parentSchema, String name) {
-    this(host, DEFAULT_CASSANDRA_PORT, keyspace, null, null, parentSchema, name);
-  }
-
   /**
    * Creates a Cassandra schema.
    *
-   * @param host Cassandra host, e.g. "localhost"
-   * @param port Cassandra port, e.g. 9042
-   * @param keyspace Cassandra keyspace name, e.g. "twissandra"
+   * @param session a Cassandra session
+   * @param parentSchema the parent schema
+   * @param name the schema name
    */
-  @SuppressWarnings("unused")
-  public CassandraSchema(String host, int port, String keyspace,
-      SchemaPlus parentSchema, String name) {
-    this(host, port, keyspace, null, null, parentSchema, name);
-  }
-
-  /**
-   * Creates a Cassandra schema.
-   *
-   * @param host Cassandra host, e.g. "localhost"
-   * @param keyspace Cassandra keyspace name, e.g. "twissandra"
-   * @param username Cassandra username
-   * @param password Cassandra password
-   */
-  public CassandraSchema(String host, String keyspace, @Nullable String username,
-      @Nullable String password, SchemaPlus parentSchema, String name) {
-    this(host, DEFAULT_CASSANDRA_PORT, keyspace, username, password, parentSchema, name);
-  }
-
-  /**
-   * Creates a Cassandra schema.
-   *
-   * @param host Cassandra host, e.g. "localhost"
-   * @param port Cassandra port, e.g. 9042
-   * @param keyspace Cassandra keyspace name, e.g. "twissandra"
-   * @param username Cassandra username
-   * @param password Cassandra password
-   */
-  public CassandraSchema(String host, int port, String keyspace, @Nullable String username,
-      @Nullable String password, SchemaPlus parentSchema, String name) {
+  public CassandraSchema(CqlSession session, SchemaPlus parentSchema, String name) {
     super();
-
-    this.keyspace = keyspace;
-    try {
-      if (username != null && password != null) {
-        this.session = CqlSession.builder()
-            .addContactPoint(new InetSocketAddress(host, port))
-            .withAuthCredentials(username, password)
-            .withKeyspace(keyspace)
-            .withLocalDatacenter("datacenter1")
-            .build();
-      } else {
-        this.session = CqlSession.builder()
-            .addContactPoint(new InetSocketAddress(host, port))
-            .withKeyspace(keyspace)
-            .withLocalDatacenter("datacenter1")
-            .build();
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    this.session = session;
+    this.keyspace = session.getKeyspace()
+        .orElseThrow(() -> new RuntimeException("No keyspace for session " + session.getName()))
+        .asInternal();
     this.parentSchema = parentSchema;
     this.name = name;
     this.hook = prepareHook();
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
index bed42e0..8aa796c 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraSchemaFactory.java
@@ -19,36 +19,91 @@ package org.apache.calcite.adapter.cassandra;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaFactory;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.util.trace.CalciteTrace;
 
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.google.common.collect.ImmutableSet;
+
+import org.slf4j.Logger;
+
+import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /**
  * Factory that creates a {@link CassandraSchema}.
  */
 @SuppressWarnings("UnusedDeclaration")
 public class CassandraSchemaFactory implements SchemaFactory {
+
+  private static final int DEFAULT_CASSANDRA_PORT = 9042;
+  private static final Map<Map<String, Object>, CqlSession> INFO_TO_SESSION =
+      new ConcurrentHashMap<>();
+  private static final Set<String> SESSION_DEFINING_KEYS = ImmutableSet.of(
+      "host", "port", "keyspace", "username", "password");
+  protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
+
   public CassandraSchemaFactory() {
+    super();
   }
 
   @Override public Schema create(SchemaPlus parentSchema, String name,
       Map<String, Object> operand) {
-    Map map = (Map) operand;
-    String host = (String) map.get("host");
-    String keyspace = (String) map.get("keyspace");
-    String username = (String) map.get("username");
-    String password = (String) map.get("password");
 
+    final Map<String, Object> sessionMap = projectMapOverKeys(operand, SESSION_DEFINING_KEYS);
+
+    INFO_TO_SESSION.computeIfAbsent(sessionMap, m -> {
+      String host = (String) m.get("host");
+      String keyspace = (String) m.get("keyspace");
+      String username = (String) m.get("username");
+      String password = (String) m.get("password");
+      int port = getPort(m);
+
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Creating session for info {}", m);
+      }
+      try {
+        if (username != null && password != null) {
+          return CqlSession.builder()
+              .addContactPoint(new InetSocketAddress(host, port))
+              .withAuthCredentials(username, password)
+              .withKeyspace(keyspace)
+              .withLocalDatacenter("datacenter1")
+              .build();
+        } else {
+          return CqlSession.builder()
+              .addContactPoint(new InetSocketAddress(host, port))
+              .withKeyspace(keyspace)
+              .withLocalDatacenter("datacenter1")
+              .build();
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    return new CassandraSchema(INFO_TO_SESSION.get(sessionMap), parentSchema, name);
+  }
+
+  private static Map<String, Object> projectMapOverKeys(
+      Map<String, Object> map, Set<String> keysToKeep) {
+    return map.entrySet().stream()
+        .filter(e -> keysToKeep.contains(e.getKey()))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+
+  private static int getPort(Map<String, Object> map) {
     if (map.containsKey("port")) {
       Object portObj = map.get("port");
-      int port;
       if (portObj instanceof String) {
-        port = Integer.parseInt((String) portObj);
+        return Integer.parseInt((String) portObj);
       } else {
-        port = (int) portObj;
+        return (int) portObj;
       }
-      return new CassandraSchema(host, port, keyspace, username, password, parentSchema, name);
     } else {
-      return new CassandraSchema(host, keyspace, username, password, parentSchema, name);
+      return DEFAULT_CASSANDRA_PORT;
     }
   }
 }