You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2016/06/12 08:07:53 UTC
[06/13] calcite git commit: [CALCITE-1281] Druid adapter wrongly
returns all numeric values as int or float
[CALCITE-1281] Druid adapter wrongly returns all numeric values as int or float
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/ec49a0fa
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/ec49a0fa
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/ec49a0fa
Branch: refs/heads/master
Commit: ec49a0fa37195bb4b34945b53ce39b27d558d6ab
Parents: 435e203
Author: Julian Hyde <jh...@apache.org>
Authored: Wed Jun 8 17:03:17 2016 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Wed Jun 8 17:35:03 2016 -0700
----------------------------------------------------------------------
.../adapter/druid/DruidConnectionImpl.java | 72 +++++++++++++++-----
.../calcite/adapter/druid/DruidQuery.java | 28 +++++++-
.../org/apache/calcite/test/DruidAdapterIT.java | 33 +++++++--
3 files changed, 112 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/ec49a0fa/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
index dccda9f..7520d70 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
@@ -22,6 +22,7 @@ import org.apache.calcite.interpreter.Sink;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.prepare.CalcitePrepareImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Holder;
@@ -40,6 +41,7 @@ import com.google.common.collect.ImmutableSet;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -58,13 +60,24 @@ class DruidConnectionImpl implements DruidConnection {
private final String url;
private final String coordinatorUrl;
- public DruidConnectionImpl(String url, String coordinatorUrl) {
+ DruidConnectionImpl(String url, String coordinatorUrl) {
this.url = Preconditions.checkNotNull(url);
this.coordinatorUrl = Preconditions.checkNotNull(coordinatorUrl);
}
+ /** Executes a query request.
+ *
+ * @param queryType Query type
+ * @param data Data to post
+ * @param sink Sink to which to send the parsed rows
+ * @param fieldNames Names of fields
+ * @param fieldTypes Types of fields (never null, but elements may be null)
+ * @param page Page definition (in/out)
+ * @throws IOException on error
+ */
public void request(QueryType queryType, String data, Sink sink,
- List<String> fieldNames, Page page) throws IOException {
+ List<String> fieldNames, List<Primitive> fieldTypes, Page page)
+ throws IOException {
final String url = this.url + "/druid/v2/?pretty";
final Map<String, String> requestHeaders =
ImmutableMap.of("Content-Type", "application/json");
@@ -73,14 +86,14 @@ class DruidConnectionImpl implements DruidConnection {
}
try (InputStream in0 = post(url, data, requestHeaders, 10000, 1800000);
InputStream in = traceResponse(in0)) {
- parse(queryType, in, sink, fieldNames, page);
+ parse(queryType, in, sink, fieldNames, fieldTypes, page);
}
}
/** Parses the output of a {@code topN} query, sending the results to a
* {@link Sink}. */
private void parse(QueryType queryType, InputStream in, Sink sink,
- List<String> fieldNames, Page page) {
+ List<String> fieldNames, List<Primitive> fieldTypes, Page page) {
final JsonFactory factory = new JsonFactory();
final Row.RowBuilder rowBuilder = Row.newBuilder(fieldNames.size());
@@ -105,7 +118,7 @@ class DruidConnectionImpl implements DruidConnection {
&& parser.nextToken() == JsonToken.START_ARRAY) {
while (parser.nextToken() == JsonToken.START_OBJECT) {
// loop until token equal to "}"
- parseFields(fieldNames, rowBuilder, parser);
+ parseFields(fieldNames, fieldTypes, rowBuilder, parser);
sink.send(rowBuilder.build());
rowBuilder.reset();
}
@@ -145,7 +158,7 @@ class DruidConnectionImpl implements DruidConnection {
if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("event")
&& parser.nextToken() == JsonToken.START_OBJECT) {
- parseFields(fieldNames, rowBuilder, parser);
+ parseFields(fieldNames, fieldTypes, rowBuilder, parser);
sink.send(rowBuilder.build());
rowBuilder.reset();
}
@@ -165,7 +178,7 @@ class DruidConnectionImpl implements DruidConnection {
if (parser.nextToken() == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals("event")
&& parser.nextToken() == JsonToken.START_OBJECT) {
- parseFields(fieldNames, rowBuilder, parser);
+ parseFields(fieldNames, fieldTypes, rowBuilder, parser);
sink.send(rowBuilder.build());
rowBuilder.reset();
}
@@ -178,15 +191,15 @@ class DruidConnectionImpl implements DruidConnection {
}
}
- private void parseFields(List<String> fieldNames, Row.RowBuilder rowBuilder,
- JsonParser parser) throws IOException {
+ private void parseFields(List<String> fieldNames, List<Primitive> fieldTypes,
+ Row.RowBuilder rowBuilder, JsonParser parser) throws IOException {
while (parser.nextToken() == JsonToken.FIELD_NAME) {
- parseField(fieldNames, rowBuilder, parser);
+ parseField(fieldNames, fieldTypes, rowBuilder, parser);
}
}
- private void parseField(List<String> fieldNames, Row.RowBuilder rowBuilder,
- JsonParser parser) throws IOException {
+ private void parseField(List<String> fieldNames, List<Primitive> fieldTypes,
+ Row.RowBuilder rowBuilder, JsonParser parser) throws IOException {
final String fieldName = parser.getCurrentName();
// Move to next token, which is name's value
@@ -197,10 +210,35 @@ class DruidConnectionImpl implements DruidConnection {
}
switch (token) {
case VALUE_NUMBER_INT:
- rowBuilder.set(i, parser.getIntValue());
- break;
case VALUE_NUMBER_FLOAT:
- rowBuilder.set(i, parser.getDoubleValue());
+ Primitive type = fieldTypes.get(i);
+ if (type == null) {
+ if (token == JsonToken.VALUE_NUMBER_INT) {
+ type = Primitive.INT;
+ } else {
+ type = Primitive.FLOAT;
+ }
+ }
+ switch (type) {
+ case BYTE:
+ rowBuilder.set(i, parser.getIntValue());
+ break;
+ case SHORT:
+ rowBuilder.set(i, parser.getShortValue());
+ break;
+ case INT:
+ rowBuilder.set(i, parser.getIntValue());
+ break;
+ case LONG:
+ rowBuilder.set(i, parser.getLongValue());
+ break;
+ case FLOAT:
+ rowBuilder.set(i, parser.getFloatValue());
+ break;
+ case DOUBLE:
+ rowBuilder.set(i, parser.getDoubleValue());
+ break;
+ }
break;
case VALUE_TRUE:
rowBuilder.set(i, true);
@@ -287,7 +325,9 @@ class DruidConnectionImpl implements DruidConnection {
public void run() {
try {
final Page page = new Page();
- request(queryType, request, this, fieldNames, page);
+ final List<Primitive> fieldTypes =
+ Collections.nCopies(fieldNames.size(), null);
+ request(queryType, request, this, fieldNames, fieldTypes, page);
enumerator.done.set(true);
} catch (Throwable e) {
enumerator.throwableHolder.set(e);
http://git-wip-us.apache.org/repos/asf/calcite/blob/ec49a0fa/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index 9a858f9..a3fdf78 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -24,6 +24,7 @@ import org.apache.calcite.interpreter.Interpreter;
import org.apache.calcite.interpreter.Node;
import org.apache.calcite.interpreter.Sink;
import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
@@ -41,6 +42,7 @@ import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
@@ -674,6 +676,10 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
}
public void run() throws InterruptedException {
+ final List<Primitive> fieldTypes = new ArrayList<>();
+ for (RelDataTypeField field : query.getRowType().getFieldList()) {
+ fieldTypes.add(getPrimitive(field));
+ }
try {
final DruidConnectionImpl connection =
new DruidConnectionImpl(query.druidTable.schema.url,
@@ -685,12 +691,32 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
final String queryString =
querySpec.getQueryString(page.pagingIdentifier, page.offset);
connection.request(querySpec.queryType, queryString, sink,
- querySpec.fieldNames, page);
+ querySpec.fieldNames, fieldTypes, page);
} while (page.pagingIdentifier != null && page.offset > previousOffset);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
+
+ private Primitive getPrimitive(RelDataTypeField field) {
+ switch (field.getType().getSqlTypeName()) {
+ case BIGINT:
+ return Primitive.LONG;
+ case INTEGER:
+ return Primitive.INT;
+ case SMALLINT:
+ return Primitive.SHORT;
+ case TINYINT:
+ return Primitive.BYTE;
+ case REAL:
+ return Primitive.FLOAT;
+ case DOUBLE:
+ case FLOAT:
+ return Primitive.DOUBLE;
+ default:
+ return null;
+ }
+ }
}
/** Object that knows how to write itself to a
http://git-wip-us.apache.org/repos/asf/calcite/blob/ec49a0fa/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index d59ef24..fee3e3d 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -200,6 +200,29 @@ public class DruidAdapterIT {
.explainContains(explain);
}
+ /** Test case for
+ * <a href="https://issues.apache.org/jira/browse/CALCITE-1281">[CALCITE-1281]
+ * Druid adapter wrongly returns all numeric values as int or float</a>. */
+ @Test public void testSelectCount() {
+ final String sql = "select count(*) as c from \"foodmart\"";
+ sql(sql)
+ .returns(new Function<ResultSet, Void>() {
+ public Void apply(ResultSet input) {
+ try {
+ assertThat(input.next(), is(true));
+ assertThat(input.getInt(1), is(86829));
+ assertThat(input.getLong(1), is(86829L));
+ assertThat(input.getString(1), is("86829"));
+ assertThat(input.wasNull(), is(false));
+ assertThat(input.next(), is(false));
+ return null;
+ } catch (SQLException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ });
+ }
+
@Test public void testSort() {
// Note: We do not push down SORT yet
final String explain = "PLAN="
@@ -455,7 +478,6 @@ public class DruidAdapterIT {
"C=21610; state_province=OR");
}
- @Ignore("TODO: fix invalid cast from Integer to Long")
@Test public void testGroupByAvgSumCount() {
final String sql = "select \"state_province\",\n"
+ " avg(\"unit_sales\") as a,\n"
@@ -467,11 +489,14 @@ public class DruidAdapterIT {
+ "order by 1";
String druidQuery = "'aggregations':["
+ "{'type':'longSum','name':'$f1','fieldName':'unit_sales'},"
- + "{'type':'count','name':'$f2'}]";
+ + "{'type':'count','name':'$f2','fieldName':'unit_sales'},"
+ + "{'type':'count','name':'C','fieldName':'store_sqft'},"
+ + "{'type':'count','name':'C0'}],"
+ + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
sql(sql)
.limit(2)
- .returnsUnordered("state_province=CA; A=3; S=74748; C=23190; C0=23190",
- "state_province=OR; A=3; S=67659; C=19027; C0=19027")
+ .returnsUnordered("state_province=CA; A=3; S=74748; C=24441; C0=24441",
+ "state_province=OR; A=3; S=67659; C=21610; C0=21610")
.queryContains(druidChecker(druidQuery));
}