You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2016/06/09 00:46:11 UTC

[3/6] calcite git commit: [CALCITE-1281] Druid adapter wrongly returns all numeric values as int or float

[CALCITE-1281] Druid adapter wrongly returns all numeric values as int or float


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/ec49a0fa
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/ec49a0fa
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/ec49a0fa

Branch: refs/heads/branch-1.8
Commit: ec49a0fa37195bb4b34945b53ce39b27d558d6ab
Parents: 435e203
Author: Julian Hyde <jh...@apache.org>
Authored: Wed Jun 8 17:03:17 2016 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Wed Jun 8 17:35:03 2016 -0700

----------------------------------------------------------------------
 .../adapter/druid/DruidConnectionImpl.java      | 72 +++++++++++++++-----
 .../calcite/adapter/druid/DruidQuery.java       | 28 +++++++-
 .../org/apache/calcite/test/DruidAdapterIT.java | 33 +++++++--
 3 files changed, 112 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/ec49a0fa/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
index dccda9f..7520d70 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
@@ -22,6 +22,7 @@ import org.apache.calcite.interpreter.Sink;
 import org.apache.calcite.linq4j.AbstractEnumerable;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.tree.Primitive;
 import org.apache.calcite.prepare.CalcitePrepareImpl;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Holder;
@@ -40,6 +41,7 @@ import com.google.common.collect.ImmutableSet;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -58,13 +60,24 @@ class DruidConnectionImpl implements DruidConnection {
   private final String url;
   private final String coordinatorUrl;
 
-  public DruidConnectionImpl(String url, String coordinatorUrl) {
+  DruidConnectionImpl(String url, String coordinatorUrl) {
     this.url = Preconditions.checkNotNull(url);
     this.coordinatorUrl = Preconditions.checkNotNull(coordinatorUrl);
   }
 
+  /** Executes a query request.
+   *
+   * @param queryType Query type
+   * @param data Data to post
+   * @param sink Sink to which to send the parsed rows
+   * @param fieldNames Names of fields
+   * @param fieldTypes Types of fields (never null, but elements may be null)
+   * @param page Page definition (in/out)
+   * @throws IOException on error
+   */
   public void request(QueryType queryType, String data, Sink sink,
-      List<String> fieldNames, Page page) throws IOException {
+      List<String> fieldNames, List<Primitive> fieldTypes, Page page)
+      throws IOException {
     final String url = this.url + "/druid/v2/?pretty";
     final Map<String, String> requestHeaders =
         ImmutableMap.of("Content-Type", "application/json");
@@ -73,14 +86,14 @@ class DruidConnectionImpl implements DruidConnection {
     }
     try (InputStream in0 = post(url, data, requestHeaders, 10000, 1800000);
          InputStream in = traceResponse(in0)) {
-      parse(queryType, in, sink, fieldNames, page);
+      parse(queryType, in, sink, fieldNames, fieldTypes, page);
     }
   }
 
   /** Parses the output of a {@code topN} query, sending the results to a
    * {@link Sink}. */
   private void parse(QueryType queryType, InputStream in, Sink sink,
-      List<String> fieldNames, Page page) {
+      List<String> fieldNames, List<Primitive> fieldTypes, Page page) {
     final JsonFactory factory = new JsonFactory();
     final Row.RowBuilder rowBuilder = Row.newBuilder(fieldNames.size());
 
@@ -105,7 +118,7 @@ class DruidConnectionImpl implements DruidConnection {
               && parser.nextToken() == JsonToken.START_ARRAY) {
             while (parser.nextToken() == JsonToken.START_OBJECT) {
               // loop until token equal to "}"
-              parseFields(fieldNames, rowBuilder, parser);
+              parseFields(fieldNames, fieldTypes, rowBuilder, parser);
               sink.send(rowBuilder.build());
               rowBuilder.reset();
             }
@@ -145,7 +158,7 @@ class DruidConnectionImpl implements DruidConnection {
                 if (parser.nextToken() == JsonToken.FIELD_NAME
                     && parser.getCurrentName().equals("event")
                     && parser.nextToken() == JsonToken.START_OBJECT) {
-                  parseFields(fieldNames, rowBuilder, parser);
+                  parseFields(fieldNames, fieldTypes, rowBuilder, parser);
                   sink.send(rowBuilder.build());
                   rowBuilder.reset();
                 }
@@ -165,7 +178,7 @@ class DruidConnectionImpl implements DruidConnection {
             if (parser.nextToken() == JsonToken.FIELD_NAME
                 && parser.getCurrentName().equals("event")
                 && parser.nextToken() == JsonToken.START_OBJECT) {
-              parseFields(fieldNames, rowBuilder, parser);
+              parseFields(fieldNames, fieldTypes, rowBuilder, parser);
               sink.send(rowBuilder.build());
               rowBuilder.reset();
             }
@@ -178,15 +191,15 @@ class DruidConnectionImpl implements DruidConnection {
     }
   }
 
-  private void parseFields(List<String> fieldNames, Row.RowBuilder rowBuilder,
-      JsonParser parser) throws IOException {
+  private void parseFields(List<String> fieldNames, List<Primitive> fieldTypes,
+      Row.RowBuilder rowBuilder, JsonParser parser) throws IOException {
     while (parser.nextToken() == JsonToken.FIELD_NAME) {
-      parseField(fieldNames, rowBuilder, parser);
+      parseField(fieldNames, fieldTypes, rowBuilder, parser);
     }
   }
 
-  private void parseField(List<String> fieldNames, Row.RowBuilder rowBuilder,
-      JsonParser parser) throws IOException {
+  private void parseField(List<String> fieldNames, List<Primitive> fieldTypes,
+      Row.RowBuilder rowBuilder, JsonParser parser) throws IOException {
     final String fieldName = parser.getCurrentName();
 
     // Move to next token, which is name's value
@@ -197,10 +210,35 @@ class DruidConnectionImpl implements DruidConnection {
     }
     switch (token) {
     case VALUE_NUMBER_INT:
-      rowBuilder.set(i, parser.getIntValue());
-      break;
     case VALUE_NUMBER_FLOAT:
-      rowBuilder.set(i, parser.getDoubleValue());
+      Primitive type = fieldTypes.get(i);
+      if (type == null) {
+        if (token == JsonToken.VALUE_NUMBER_INT) {
+          type = Primitive.INT;
+        } else {
+          type = Primitive.FLOAT;
+        }
+      }
+      switch (type) {
+      case BYTE:
+        rowBuilder.set(i, parser.getIntValue());
+        break;
+      case SHORT:
+        rowBuilder.set(i, parser.getShortValue());
+        break;
+      case INT:
+        rowBuilder.set(i, parser.getIntValue());
+        break;
+      case LONG:
+        rowBuilder.set(i, parser.getLongValue());
+        break;
+      case FLOAT:
+        rowBuilder.set(i, parser.getFloatValue());
+        break;
+      case DOUBLE:
+        rowBuilder.set(i, parser.getDoubleValue());
+        break;
+      }
       break;
     case VALUE_TRUE:
       rowBuilder.set(i, true);
@@ -287,7 +325,9 @@ class DruidConnectionImpl implements DruidConnection {
           public void run() {
             try {
               final Page page = new Page();
-              request(queryType, request, this, fieldNames, page);
+              final List<Primitive> fieldTypes =
+                  Collections.nCopies(fieldNames.size(), null);
+              request(queryType, request, this, fieldNames, fieldTypes, page);
               enumerator.done.set(true);
             } catch (Throwable e) {
               enumerator.throwableHolder.set(e);

http://git-wip-us.apache.org/repos/asf/calcite/blob/ec49a0fa/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index 9a858f9..a3fdf78 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -24,6 +24,7 @@ import org.apache.calcite.interpreter.Interpreter;
 import org.apache.calcite.interpreter.Node;
 import org.apache.calcite.interpreter.Sink;
 import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.tree.Primitive;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -41,6 +42,7 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
@@ -674,6 +676,10 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
     }
 
     public void run() throws InterruptedException {
+      final List<Primitive> fieldTypes = new ArrayList<>();
+      for (RelDataTypeField field : query.getRowType().getFieldList()) {
+        fieldTypes.add(getPrimitive(field));
+      }
       try {
         final DruidConnectionImpl connection =
             new DruidConnectionImpl(query.druidTable.schema.url,
@@ -685,12 +691,32 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
           final String queryString =
               querySpec.getQueryString(page.pagingIdentifier, page.offset);
           connection.request(querySpec.queryType, queryString, sink,
-              querySpec.fieldNames, page);
+              querySpec.fieldNames, fieldTypes, page);
         } while (page.pagingIdentifier != null && page.offset > previousOffset);
       } catch (IOException e) {
         throw Throwables.propagate(e);
       }
     }
+
+    private Primitive getPrimitive(RelDataTypeField field) {
+      switch (field.getType().getSqlTypeName()) {
+      case BIGINT:
+        return Primitive.LONG;
+      case INTEGER:
+        return Primitive.INT;
+      case SMALLINT:
+        return Primitive.SHORT;
+      case TINYINT:
+        return Primitive.BYTE;
+      case REAL:
+        return Primitive.FLOAT;
+      case DOUBLE:
+      case FLOAT:
+        return Primitive.DOUBLE;
+      default:
+        return null;
+      }
+    }
   }
 
   /** Object that knows how to write itself to a

http://git-wip-us.apache.org/repos/asf/calcite/blob/ec49a0fa/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index d59ef24..fee3e3d 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -200,6 +200,29 @@ public class DruidAdapterIT {
         .explainContains(explain);
   }
 
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-1281">[CALCITE-1281]
+   * Druid adapter wrongly returns all numeric values as int or float</a>. */
+  @Test public void testSelectCount() {
+    final String sql = "select count(*) as c from \"foodmart\"";
+    sql(sql)
+        .returns(new Function<ResultSet, Void>() {
+          public Void apply(ResultSet input) {
+            try {
+              assertThat(input.next(), is(true));
+              assertThat(input.getInt(1), is(86829));
+              assertThat(input.getLong(1), is(86829L));
+              assertThat(input.getString(1), is("86829"));
+              assertThat(input.wasNull(), is(false));
+              assertThat(input.next(), is(false));
+              return null;
+            } catch (SQLException e) {
+              throw Throwables.propagate(e);
+            }
+          }
+        });
+  }
+
   @Test public void testSort() {
     // Note: We do not push down SORT yet
     final String explain = "PLAN="
@@ -455,7 +478,6 @@ public class DruidAdapterIT {
             "C=21610; state_province=OR");
   }
 
-  @Ignore("TODO: fix invalid cast from Integer to Long")
   @Test public void testGroupByAvgSumCount() {
     final String sql = "select \"state_province\",\n"
         + " avg(\"unit_sales\") as a,\n"
@@ -467,11 +489,14 @@ public class DruidAdapterIT {
         + "order by 1";
     String druidQuery = "'aggregations':["
         + "{'type':'longSum','name':'$f1','fieldName':'unit_sales'},"
-        + "{'type':'count','name':'$f2'}]";
+        + "{'type':'count','name':'$f2','fieldName':'unit_sales'},"
+        + "{'type':'count','name':'C','fieldName':'store_sqft'},"
+        + "{'type':'count','name':'C0'}],"
+        + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
     sql(sql)
         .limit(2)
-        .returnsUnordered("state_province=CA; A=3; S=74748; C=23190; C0=23190",
-            "state_province=OR; A=3; S=67659; C=19027; C0=19027")
+        .returnsUnordered("state_province=CA; A=3; S=74748; C=24441; C0=24441",
+            "state_province=OR; A=3; S=67659; C=21610; C0=21610")
         .queryContains(druidChecker(druidQuery));
   }