You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2021/03/13 07:11:37 UTC

[calcite] 02/04: [CALCITE-4418] Allow Interpreter to read from JDBC input

This is an automated email from the ASF dual-hosted git repository.

jhyde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git

commit fc6d56f1ac6d590b8a332cb0931a71ecd2367969
Author: Julian Hyde <jh...@apache.org>
AuthorDate: Mon Nov 23 11:22:30 2020 -0800

    [CALCITE-4418] Allow Interpreter to read from JDBC input
    
    The main changes concern whether we shift the time zone of
    DATE, TIME and TIMESTAMP values when we convert them from
    JDBC types (java.sql.Date, Time and Timestamp) to internal
    types (int, int, long) and back again.
---
 .../apache/calcite/adapter/clone/ColumnLoader.java |   3 +
 .../calcite/adapter/enumerable/EnumUtils.java      |  63 +++++++++
 .../org/apache/calcite/adapter/jdbc/JdbcTable.java |  13 +-
 .../org/apache/calcite/adapter/jdbc/JdbcUtils.java | 152 ++++++++++++++-------
 .../org/apache/calcite/test/CalciteAssert.java     |  13 +-
 .../org/apache/calcite/test/InterpreterTest.java   |  38 +++++-
 6 files changed, 218 insertions(+), 64 deletions(-)

diff --git a/core/src/main/java/org/apache/calcite/adapter/clone/ColumnLoader.java b/core/src/main/java/org/apache/calcite/adapter/clone/ColumnLoader.java
index 20e7171..006698a 100644
--- a/core/src/main/java/org/apache/calcite/adapter/clone/ColumnLoader.java
+++ b/core/src/main/java/org/apache/calcite/adapter/clone/ColumnLoader.java
@@ -241,6 +241,9 @@ class ColumnLoader<T> {
    * {@link Integer}. */
   private static List<? extends @Nullable Object> wrap(ColumnMetaData.Rep rep, List<?> list,
       RelDataType type) {
+    if (true) {
+      return list;
+    }
     switch (type.getSqlTypeName()) {
     case TIMESTAMP:
       switch (rep) {
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
index 65aa54d..f797706 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
@@ -17,6 +17,7 @@
 package org.apache.calcite.adapter.enumerable;
 
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.linq4j.AbstractEnumerable;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
@@ -62,6 +63,9 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.text.Collator;
 import java.util.AbstractList;
 import java.util.ArrayDeque;
@@ -72,6 +76,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.TimeZone;
+import java.util.function.Function;
 
 import static java.util.Objects.requireNonNull;
 
@@ -1140,4 +1146,61 @@ public class EnumUtils {
             Expressions.constant(locale.getVariant())),
         Expressions.constant(strength));
   }
+
+  /** Returns a function that converts an internal value to an external
+   * value.
+   *
+   * <p>Datetime values' internal representations have no time zone,
+   * and their external values are moments (relative to UTC epoch),
+   * so the {@code timeZone} parameter supplies the implicit time zone of
+   * the internal representation. If you specify the local time zone of the
+   * JVM, then {@link Timestamp#toString}, {@link Date#toString()}, and
+   * {@link Time#toString()} on the external values will give a value
+   * consistent with the internal values. */
+  public static Function<Object, Object> toExternal(RelDataType type,
+      TimeZone timeZone) {
+    switch (type.getSqlTypeName()) {
+    case DATE:
+      return o -> {
+        int d = (Integer) o;
+        long v = d * DateTimeUtils.MILLIS_PER_DAY;
+        v -= timeZone.getOffset(v);
+        return new Date(v);
+      };
+    case TIME:
+      return o -> {
+        long v = (Integer) o;
+        v -= timeZone.getOffset(v);
+        return new Time(v % DateTimeUtils.MILLIS_PER_DAY);
+      };
+    case TIMESTAMP:
+      return o -> {
+        long v = (Long) o;
+        v -= timeZone.getOffset(v);
+        return new Timestamp(v);
+      };
+    default:
+      return Function.identity();
+    }
+  }
+
+  /** Returns a function that converts an array of internal values to
+   * a list of external values. */
+  @SuppressWarnings("unchecked")
+  public static Function<@Nullable Object[], List<@Nullable Object>> toExternal(
+      List<RelDataType> types, TimeZone timeZone) {
+    final Function<Object, Object>[] functions = new Function[types.size()];
+    for (int i = 0; i < types.size(); i++) {
+      functions[i] = toExternal(types.get(i), timeZone);
+    }
+    final @Nullable Object[] objects = new @Nullable Object[types.size()];
+    return values -> {
+      for (int i = 0; i < values.length; i++) {
+        objects[i] = values[i] == null
+            ? null
+            : functions[i].apply(values[i]);
+      }
+      return Arrays.asList(objects.clone());
+    };
+  }
 }
diff --git a/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcTable.java b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcTable.java
index 4151470..2a68285 100644
--- a/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcTable.java
+++ b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcTable.java
@@ -183,7 +183,7 @@ public class JdbcTable extends AbstractQueryableTable
     JavaTypeFactory typeFactory = root.getTypeFactory();
     final SqlString sql = generateSql();
     return ResultSetEnumerable.of(jdbcSchema.getDataSource(), sql.getSql(),
-        JdbcUtils.ObjectArrayRowBuilder.factory(fieldClasses(typeFactory)));
+        JdbcUtils.rowBuilderFactory2(fieldClasses(typeFactory)));
   }
 
   @Override public @Nullable Collection getModifiableCollection() {
@@ -219,11 +219,12 @@ public class JdbcTable extends AbstractQueryableTable
       final JavaTypeFactory typeFactory =
           ((CalciteConnection) queryProvider).getTypeFactory();
       final SqlString sql = generateSql();
-      //noinspection unchecked
-      final Enumerable<T> enumerable = (Enumerable<T>) ResultSetEnumerable.of(
-          jdbcSchema.getDataSource(),
-          sql.getSql(),
-          JdbcUtils.ObjectArrayRowBuilder.factory(fieldClasses(typeFactory)));
+      final List<Pair<ColumnMetaData.Rep, Integer>> pairs =
+          fieldClasses(typeFactory);
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      final Enumerable<T> enumerable =
+          (Enumerable) ResultSetEnumerable.of(jdbcSchema.getDataSource(),
+              sql.getSql(), JdbcUtils.rowBuilderFactory2(pairs));
       return enumerable.enumerator();
     }
   }
diff --git a/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcUtils.java b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcUtils.java
index bdda910..ef48b50 100644
--- a/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcUtils.java
+++ b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcUtils.java
@@ -24,6 +24,7 @@ import org.apache.calcite.sql.SqlDialect;
 import org.apache.calcite.sql.SqlDialectFactory;
 import org.apache.calcite.util.ImmutableNullableList;
 import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
 
 import org.apache.commons.dbcp2.BasicDataSource;
 
@@ -33,7 +34,6 @@ import com.google.common.cache.LoadingCache;
 import com.google.common.primitives.Ints;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
-import org.checkerframework.checker.nullness.qual.PolyNull;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -55,6 +55,28 @@ final class JdbcUtils {
     throw new AssertionError("no instances!");
   }
 
+  /** Returns a function that, given a {@link ResultSet}, returns a function
+   * that will yield successive rows from that result set. */
+  static Function1<ResultSet, Function0<@Nullable Object[]>> rowBuilderFactory(
+      final List<Pair<ColumnMetaData.Rep, Integer>> list) {
+    ColumnMetaData.Rep[] reps =
+        Pair.left(list).toArray(new ColumnMetaData.Rep[0]);
+    int[] types = Ints.toArray(Pair.right(list));
+    return resultSet -> new ObjectArrayRowBuilder1(resultSet, reps, types);
+  }
+
+  /** Returns a function that, given a {@link ResultSet}, returns a function
+   * that will yield successive rows from that result set;
+   * as {@link #rowBuilderFactory(List)} except that values are in Calcite's
+   * internal format (e.g. DATE represented as int). */
+  static Function1<ResultSet, Function0<@Nullable Object[]>> rowBuilderFactory2(
+      final List<Pair<ColumnMetaData.Rep, Integer>> list) {
+    ColumnMetaData.Rep[] reps =
+        Pair.left(list).toArray(new ColumnMetaData.Rep[0]);
+    int[] types = Ints.toArray(Pair.right(list));
+    return resultSet -> new ObjectArrayRowBuilder2(resultSet, reps, types);
+  }
+
   /** Pool of dialects. */
   static class DialectPool {
     public static final DialectPool INSTANCE = new DialectPool();
@@ -98,33 +120,23 @@ final class JdbcUtils {
   /** Builder that calls {@link ResultSet#getObject(int)} for every column,
    * or {@code getXxx} if the result type is a primitive {@code xxx},
    * and returns an array of objects for each row. */
-  static class ObjectArrayRowBuilder implements Function0<@Nullable Object[]> {
-    private final ResultSet resultSet;
-    private final int columnCount;
-    private final ColumnMetaData.Rep[] reps;
-    private final int[] types;
+  abstract static class ObjectArrayRowBuilder
+      implements Function0<@Nullable Object[]> {
+    protected final ResultSet resultSet;
+    protected final int columnCount;
+    protected final ColumnMetaData.Rep[] reps;
+    protected final int[] types;
 
     ObjectArrayRowBuilder(ResultSet resultSet, ColumnMetaData.Rep[] reps,
-        int[] types)
-        throws SQLException {
+        int[] types) {
       this.resultSet = resultSet;
       this.reps = reps;
       this.types = types;
-      this.columnCount = resultSet.getMetaData().getColumnCount();
-    }
-
-    public static Function1<ResultSet, Function0<@Nullable Object[]>> factory(
-        final List<Pair<ColumnMetaData.Rep, Integer>> list) {
-      return resultSet -> {
-        try {
-          return new ObjectArrayRowBuilder(
-              resultSet,
-              Pair.left(list).toArray(new ColumnMetaData.Rep[0]),
-              Ints.toArray(Pair.right(list)));
-        } catch (SQLException e) {
-          throw new RuntimeException(e);
-        }
-      };
+      try {
+        this.columnCount = resultSet.getMetaData().getColumnCount();
+      } catch (SQLException e) {
+        throw Util.throwAsRuntime(e);
+      }
     }
 
     @Override public @Nullable Object[] apply() {
@@ -144,54 +156,92 @@ final class JdbcUtils {
      *
      * @param i Ordinal of column (1-based, per JDBC)
      */
-    private @Nullable Object value(int i) throws SQLException {
+    protected abstract @Nullable Object value(int i) throws SQLException;
+
+    long timestampToLong(Timestamp v) {
+      return v.getTime();
+    }
+
+    long timeToLong(Time v) {
+      return v.getTime();
+    }
+
+    long dateToLong(Date v) {
+      return v.getTime();
+    }
+  }
+
+  /** Row builder that shifts DATE, TIME, TIMESTAMP values into local time
+   * zone. */
+  static class ObjectArrayRowBuilder1 extends ObjectArrayRowBuilder {
+    final TimeZone timeZone = TimeZone.getDefault();
+
+    ObjectArrayRowBuilder1(ResultSet resultSet, ColumnMetaData.Rep[] reps,
+        int[] types) {
+      super(resultSet, reps, types);
+    }
+
+    @Override protected @Nullable Object value(int i) throws SQLException {
       // MySQL returns timestamps shifted into local time. Using
       // getTimestamp(int, Calendar) with a UTC calendar should prevent this,
       // but does not. So we shift explicitly.
       switch (types[i]) {
       case Types.TIMESTAMP:
-        return shift(resultSet.getTimestamp(i + 1));
+        final Timestamp timestamp = resultSet.getTimestamp(i + 1);
+        return timestamp == null ? null : new Timestamp(timestampToLong(timestamp));
       case Types.TIME:
-        return shift(resultSet.getTime(i + 1));
+        final Time time = resultSet.getTime(i + 1);
+        return time == null ? null : new Time(timeToLong(time));
       case Types.DATE:
-        return shift(resultSet.getDate(i + 1));
+        final Date date = resultSet.getDate(i + 1);
+        return date == null ? null : new Date(dateToLong(date));
       default:
         break;
       }
       return reps[i].jdbcGet(resultSet, i + 1);
     }
 
-    /** Returns a timestamp shifted by the default time-zone's offset;
-     * null if and only if {@code v} is null. */
-    private static @PolyNull Timestamp shift(@PolyNull Timestamp v) {
-      if (v == null) {
-        return null;
-      }
+    @Override long timestampToLong(Timestamp v) {
       long time = v.getTime();
-      int offset = TimeZone.getDefault().getOffset(time);
-      return new Timestamp(time + offset);
+      int offset = timeZone.getOffset(time);
+      return time + offset;
     }
 
-    /** Returns a time shifted by the default time-zone's offset;
-     * null if and only if {@code v} is null. */
-    private static @PolyNull Time shift(@PolyNull Time v) {
-      if (v == null) {
-        return null;
-      }
+    @Override long timeToLong(Time v) {
       long time = v.getTime();
-      int offset = TimeZone.getDefault().getOffset(time);
-      return new Time((time + offset) % DateTimeUtils.MILLIS_PER_DAY);
+      int offset = timeZone.getOffset(time);
+      return (time + offset) % DateTimeUtils.MILLIS_PER_DAY;
     }
 
-    /** Returns a date shifted by the default time-zone's offset;
-     * null if and only if {@code v} is null. */
-    private static @PolyNull Date shift(@PolyNull Date v) {
-      if (v == null) {
-        return null;
-      }
+    @Override long dateToLong(Date v) {
       long time = v.getTime();
-      int offset = TimeZone.getDefault().getOffset(time);
-      return new Date(time + offset);
+      int offset = timeZone.getOffset(time);
+      return time + offset;
+    }
+  }
+
+  /** Row builder that converts JDBC values into internal values. */
+  static class ObjectArrayRowBuilder2 extends ObjectArrayRowBuilder1 {
+    ObjectArrayRowBuilder2(ResultSet resultSet, ColumnMetaData.Rep[] reps,
+        int[] types) {
+      super(resultSet, reps, types);
+    }
+
+    @Override protected @Nullable Object value(int i) throws SQLException {
+      switch (types[i]) {
+      case Types.TIMESTAMP:
+        final Timestamp timestamp = resultSet.getTimestamp(i + 1);
+        return timestamp == null ? null : timestampToLong(timestamp);
+      case Types.TIME:
+        final Time time = resultSet.getTime(i + 1);
+        return time == null ? null : (int) timeToLong(time);
+      case Types.DATE:
+        final Date date = resultSet.getDate(i + 1);
+        return date == null ? null
+            : (int) (dateToLong(date) / DateTimeUtils.MILLIS_PER_DAY);
+      default:
+        return reps[i].jdbcGet(resultSet, i + 1);
+      }
     }
   }
 
diff --git a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
index 6c04ba8..7bda2e1 100644
--- a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
+++ b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
@@ -745,7 +745,18 @@ public class CalciteAssert {
     throw new AssertionError("method " + methodName + " not found");
   }
 
-  public static SchemaPlus addSchema(SchemaPlus rootSchema, SchemaSpec schema) {
+  /** Adds a schema specification (or specifications) to the root schema,
+   * returning the last one created. */
+  public static SchemaPlus addSchema(SchemaPlus rootSchema,
+      SchemaSpec... schemas) {
+    SchemaPlus s = rootSchema;
+    for (SchemaSpec schema : schemas) {
+      s = addSchema_(rootSchema, schema);
+    }
+    return s;
+  }
+
+  static SchemaPlus addSchema_(SchemaPlus rootSchema, SchemaSpec schema) {
     final SchemaPlus foodmart;
     final SchemaPlus jdbcScott;
     final SchemaPlus scott;
diff --git a/core/src/test/java/org/apache/calcite/test/InterpreterTest.java b/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
index 72b40e2..f24659b 100644
--- a/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/InterpreterTest.java
@@ -17,7 +17,9 @@
 package org.apache.calcite.test;
 
 import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.enumerable.EnumUtils;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.interpreter.Interpreter;
 import org.apache.calcite.linq4j.QueryProvider;
 import org.apache.calcite.plan.hep.HepPlanner;
@@ -26,6 +28,8 @@ import org.apache.calcite.plan.hep.HepProgramBuilder;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.parser.SqlParseException;
@@ -46,6 +50,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Function;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -123,8 +128,7 @@ class InterpreterTest {
         SqlNode validate = planner.validate(parse);
         final RelRoot root = planner.rel(validate);
         RelNode convert = project ? root.project() : root.rel;
-        final Interpreter interpreter = new Interpreter(dataContext, convert);
-        assertRows(interpreter, unordered, rows);
+        assertInterpret(convert, dataContext, unordered, rows);
         return this;
       } catch (ValidationException
           | SqlParseException
@@ -144,7 +148,9 @@ class InterpreterTest {
     final FrameworkConfig config = Frameworks.newConfigBuilder()
         .parserConfig(SqlParser.Config.DEFAULT)
         .defaultSchema(
-            CalciteAssert.addSchema(rootSchema, CalciteAssert.SchemaSpec.HR))
+            CalciteAssert.addSchema(rootSchema,
+                CalciteAssert.SchemaSpec.JDBC_SCOTT,
+                CalciteAssert.SchemaSpec.HR))
         .build();
     planner = Frameworks.getPlanner(config);
     dataContext = new MyDataContext(planner);
@@ -189,11 +195,23 @@ class InterpreterTest {
     sql(sql).withProject(true).returnsRows("[[a, b, c]]");
   }
 
+  private static void assertInterpret(RelNode rel, DataContext dataContext,
+      boolean unordered, String... rows) {
+    final Interpreter interpreter = new Interpreter(dataContext, rel);
+    final List<RelDataType> fieldTypes =
+        Util.transform(rel.getRowType().getFieldList(),
+            RelDataTypeField::getType);
+    assertRows(interpreter,
+        EnumUtils.toExternal(fieldTypes, DateTimeUtils.DEFAULT_ZONE), unordered,
+        rows);
+  }
+
   private static void assertRows(Interpreter interpreter,
+      Function<Object[], List<Object>> converter,
       boolean unordered, String... rows) {
     final List<String> list = new ArrayList<>();
     for (Object[] row : interpreter) {
-      list.add(Arrays.toString(row));
+      list.add(converter.apply(row).toString());
     }
     final List<String> expected = Arrays.asList(rows);
     if (unordered) {
@@ -433,8 +451,7 @@ class InterpreterTest {
       final HepPlanner hepPlanner = new HepPlanner(program);
       hepPlanner.setRoot(convert);
       final RelNode relNode = hepPlanner.findBestExp();
-      final Interpreter interpreter = new Interpreter(dataContext, relNode);
-      assertRows(interpreter, true, "[1, a]", "[3, c]");
+      assertInterpret(relNode, dataContext, true, "[1, a]", "[3, c]");
     } catch (ValidationException
         | SqlParseException
         | RelConversionException e) {
@@ -513,4 +530,13 @@ class InterpreterTest {
       assertThat(e.getMessage(), equalTo("NULL value for unnest."));
     }
   }
+
+  @Test void testInterpretJdbc() {
+    sql("select empno, hiredate from jdbc_scott.emp")
+        .returnsRows("[7369, 1980-12-17]", "[7499, 1981-02-20]",
+            "[7521, 1981-02-22]", "[7566, 1981-02-04]", "[7654, 1981-09-28]",
+            "[7698, 1981-01-05]", "[7782, 1981-06-09]", "[7788, 1987-04-19]",
+            "[7839, 1981-11-17]", "[7844, 1981-09-08]", "[7876, 1987-05-23]",
+            "[7900, 1981-12-03]", "[7902, 1981-12-03]", "[7934, 1982-01-23]");
+  }
 }