You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2016/10/06 02:40:30 UTC
calcite git commit: [CALCITE-1403] Fix intervals and timezone offsets
in DruidAdapterIT
Repository: calcite
Updated Branches:
refs/heads/master e5a42f1ca -> 73544eda3
[CALCITE-1403] Fix intervals and timezone offsets in DruidAdapterIT
Note that we now require Drild 0.9.1 or higher (for support granularity greater than "day").
In DruidQuery, convert "descending" from a String to boolean JSON value.
When a Druid REST call fails, included the generated query in the error message.
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/73544eda
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/73544eda
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/73544eda
Branch: refs/heads/master
Commit: 73544eda3c043db65c82d79161856b00cf7744c6
Parents: e5a42f1
Author: Julian Hyde <jh...@apache.org>
Authored: Mon Oct 3 19:54:04 2016 -0700
Committer: Julian Hyde <jh...@apache.org>
Committed: Wed Oct 5 18:08:04 2016 -0700
----------------------------------------------------------------------
druid/pom.xml | 4 --
.../adapter/druid/DruidConnectionImpl.java | 8 ++--
.../adapter/druid/DruidDateTimeUtils.java | 45 +++++++------------
.../calcite/adapter/druid/DruidQuery.java | 47 +++++++++++---------
.../calcite/adapter/druid/DruidTable.java | 14 +++---
.../adapter/druid/DruidTableFactory.java | 12 +++--
.../org/apache/calcite/test/DruidAdapterIT.java | 20 ++++-----
7 files changed, 74 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/73544eda/druid/pom.xml
----------------------------------------------------------------------
diff --git a/druid/pom.xml b/druid/pom.xml
index 0564ce9..ee6e701 100644
--- a/druid/pom.xml
+++ b/druid/pom.xml
@@ -74,10 +74,6 @@ limitations under the License.
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
- <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/calcite/blob/73544eda/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
index 2bfc663..21067db 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
@@ -90,11 +90,10 @@ class DruidConnectionImpl implements DruidConnection {
* @param fieldNames Names of fields
* @param fieldTypes Types of fields (never null, but elements may be null)
* @param page Page definition (in/out)
- * @throws IOException on error
*/
public void request(QueryType queryType, String data, Sink sink,
- List<String> fieldNames, List<ColumnMetaData.Rep> fieldTypes, Page page)
- throws IOException {
+ List<String> fieldNames, List<ColumnMetaData.Rep> fieldTypes,
+ Page page) {
final String url = this.url + "/druid/v2/?pretty";
final Map<String, String> requestHeaders =
ImmutableMap.of("Content-Type", "application/json");
@@ -104,6 +103,9 @@ class DruidConnectionImpl implements DruidConnection {
try (InputStream in0 = post(url, data, requestHeaders, 10000, 1800000);
InputStream in = traceResponse(in0)) {
parse(queryType, in, sink, fieldNames, fieldTypes, page);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while processing druid request ["
+ + data + "]", e);
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/73544eda/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
index 2921b9f..65ade8e 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidDateTimeUtils.java
@@ -16,6 +16,7 @@
*/
package org.apache.calcite.adapter.druid;
+import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
@@ -25,8 +26,6 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.trace.CalciteTrace;
-import org.apache.commons.lang3.StringUtils;
-
import com.google.common.base.Function;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableList;
@@ -34,19 +33,18 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.TreeRangeSet;
-import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
+import org.joda.time.chrono.ISOChronology;
import org.slf4j.Logger;
import java.sql.Date;
import java.sql.Timestamp;
-import java.text.DateFormat;
-import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.List;
+import java.util.regex.Pattern;
/**
* Utilities for generating intervals from RexNode.
@@ -56,6 +54,10 @@ public class DruidDateTimeUtils {
protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
+ private static final Pattern TIMESTAMP_PATTERN =
+ Pattern.compile("[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]"
+ + " [0-9][0-9]:[0-9][0-9]:[0-9][0-9]");
+
private DruidDateTimeUtils() {
}
@@ -96,7 +98,7 @@ public class DruidDateTimeUtils {
if (range.hasUpperBound() && range.upperBoundType() == BoundType.CLOSED) {
end++;
}
- return new Interval(start, end, DateTimeZone.UTC);
+ return new Interval(start, end, ISOChronology.getInstanceUTC());
}
});
if (LOGGER.isInfoEnabled()) {
@@ -321,25 +323,9 @@ public class DruidDateTimeUtils {
if (literal instanceof Timestamp) {
return (Timestamp) literal;
}
- if (literal instanceof Date) {
- return new Timestamp(((Date) literal).getTime());
- }
- if (literal instanceof Calendar) {
- return new Timestamp(((Calendar) literal).getTime().getTime());
- }
- if (literal instanceof Number) {
- return new Timestamp(((Number) literal).longValue());
- }
- if (literal instanceof String) {
- String string = (String) literal;
- if (StringUtils.isNumeric(string)) {
- return new Timestamp(Long.valueOf(string));
- }
- try {
- return Timestamp.valueOf(string);
- } catch (NumberFormatException e) {
- // ignore
- }
+ final Long v = toLong(literal);
+ if (v != null) {
+ return new Timestamp(v);
}
return null;
}
@@ -358,15 +344,14 @@ public class DruidDateTimeUtils {
return ((Calendar) literal).getTime().getTime();
}
if (literal instanceof String) {
+ final String s = (String) literal;
try {
- return Long.valueOf((String) literal);
+ return Long.valueOf(s);
} catch (NumberFormatException e) {
// ignore
}
- try {
- return DateFormat.getDateInstance().parse((String) literal).getTime();
- } catch (ParseException e) {
- // best effort. ignore
+ if (TIMESTAMP_PATTERN.matcher(s).matches()) {
+ return DateTimeUtils.timestampStringToUnixDate(s);
}
}
return null;
http://git-wip-us.apache.org/repos/asf/calcite/blob/73544eda/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index bb2a534..fb48557 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -60,12 +60,14 @@ import org.apache.calcite.util.Util;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
+
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.joda.time.Interval;
+import org.joda.time.chrono.ISOChronology;
import java.io.IOException;
import java.io.StringWriter;
@@ -84,7 +86,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
final RelOptTable table;
final DruidTable druidTable;
- final List<Interval> intervals;
+ final ImmutableList<Interval> intervals;
final ImmutableList<RelNode> rels;
private static final Pattern VALID_SIG = Pattern.compile("sf?p?a?l?");
@@ -143,6 +145,11 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
if (!isValidSignature(signature)) {
return litmus.fail("invalid signature [{}]", signature);
}
+ for (Interval interval : intervals) {
+ if (interval.getChronology() != ISOChronology.getInstanceUTC()) {
+ return litmus.fail("interval must be UTC", interval);
+ }
+ }
if (rels.isEmpty()) {
return litmus.fail("must have at least one rel");
}
@@ -552,8 +559,8 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
generator.writeStringField("queryType", "timeseries");
generator.writeStringField("dataSource", druidTable.dataSource);
- generator.writeStringField("descending", timeSeriesDirection != null
- && timeSeriesDirection == Direction.DESCENDING ? "true" : "false");
+ generator.writeBooleanField("descending", timeSeriesDirection != null
+ && timeSeriesDirection == Direction.DESCENDING);
generator.writeStringField("granularity", granularity);
writeFieldIf(generator, "filter", jsonFilter);
writeField(generator, "aggregations", aggregations);
@@ -609,7 +616,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
generator.writeStringField("queryType", "select");
generator.writeStringField("dataSource", druidTable.dataSource);
- generator.writeStringField("descending", "false");
+ generator.writeBooleanField("descending", false);
writeField(generator, "intervals", intervals);
writeFieldIf(generator, "filter", jsonFilter);
writeField(generator, "dimensions", translator.dimensions);
@@ -895,23 +902,21 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
for (RelDataTypeField field : query.getRowType().getFieldList()) {
fieldTypes.add(getPrimitive(field));
}
- try {
- final DruidConnectionImpl connection =
- new DruidConnectionImpl(query.druidTable.schema.url,
- query.druidTable.schema.coordinatorUrl);
- final boolean limitQuery = containsLimit(querySpec);
- final DruidConnectionImpl.Page page = new DruidConnectionImpl.Page();
- int previousOffset;
- do {
- previousOffset = page.offset;
- final String queryString =
- querySpec.getQueryString(page.pagingIdentifier, page.offset);
- connection.request(querySpec.queryType, queryString, sink,
- querySpec.fieldNames, fieldTypes, page);
- } while (!limitQuery && page.pagingIdentifier != null && page.offset > previousOffset);
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
+ final DruidConnectionImpl connection =
+ new DruidConnectionImpl(query.druidTable.schema.url,
+ query.druidTable.schema.coordinatorUrl);
+ final boolean limitQuery = containsLimit(querySpec);
+ final DruidConnectionImpl.Page page = new DruidConnectionImpl.Page();
+ int previousOffset;
+ do {
+ previousOffset = page.offset;
+ final String queryString =
+ querySpec.getQueryString(page.pagingIdentifier, page.offset);
+ connection.request(querySpec.queryType, queryString, sink,
+ querySpec.fieldNames, fieldTypes, page);
+ } while (!limitQuery
+ && page.pagingIdentifier != null
+ && page.offset > previousOffset);
}
private static boolean containsLimit(QuerySpec querySpec) {
http://git-wip-us.apache.org/repos/asf/calcite/blob/73544eda/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java
index 836941b..efdaa3d 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java
@@ -29,7 +29,6 @@ import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.Util;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -39,6 +38,7 @@ import com.google.common.collect.Iterables;
import org.joda.time.DateTime;
import org.joda.time.Interval;
+import org.joda.time.chrono.ISOChronology;
import java.util.List;
import java.util.Map;
@@ -51,13 +51,14 @@ public class DruidTable extends AbstractTable implements TranslatableTable {
public static final String DEFAULT_TIMESTAMP_COLUMN = "__time";
public static final Interval DEFAULT_INTERVAL =
- new Interval(new DateTime("1900-01-01"), new DateTime("3000-01-01"));
+ new Interval(new DateTime("1900-01-01", ISOChronology.getInstanceUTC()),
+ new DateTime("3000-01-01", ISOChronology.getInstanceUTC()));
final DruidSchema schema;
final String dataSource;
final RelProtoDataType protoRowType;
final ImmutableSet<String> metricFieldNames;
- final List<Interval> intervals;
+ final ImmutableList<Interval> intervals;
final String timestampFieldName;
/**
@@ -79,8 +80,11 @@ public class DruidTable extends AbstractTable implements TranslatableTable {
this.dataSource = Preconditions.checkNotNull(dataSource);
this.protoRowType = protoRowType;
this.metricFieldNames = ImmutableSet.copyOf(metricFieldNames);
- this.intervals = Preconditions.checkNotNull(
- Util.first(intervals, ImmutableList.of(DEFAULT_INTERVAL)));
+ this.intervals = intervals != null ? ImmutableList.copyOf(intervals)
+ : ImmutableList.of(DEFAULT_INTERVAL);
+ for (Interval interval : this.intervals) {
+ assert interval.getChronology() == ISOChronology.getInstanceUTC();
+ }
}
/** Creates a {@link DruidTable}
http://git-wip-us.apache.org/repos/asf/calcite/blob/73544eda/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
index c48fbff..46f5e04 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
@@ -26,6 +26,7 @@ import org.apache.calcite.util.Util;
import com.google.common.collect.ImmutableList;
import org.joda.time.Interval;
+import org.joda.time.chrono.ISOChronology;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -99,9 +100,14 @@ public class DruidTableFactory implements TableFactory {
} else {
c = null;
}
- final Object interval = operand.get("interval");
- final List<Interval> intervals = interval instanceof String
- ? ImmutableList.of(Interval.parse((String) interval)) : null;
+ final Object intervalString = operand.get("interval");
+ final List<Interval> intervals;
+ if (intervalString instanceof String) {
+ intervals = ImmutableList.of(
+ new Interval(intervalString, ISOChronology.getInstanceUTC()));
+ } else {
+ intervals = null;
+ }
return DruidTable.create(druidSchema, dataSourceName, intervals,
fieldBuilder, metricNameBuilder, timestampColumnName, c);
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/73544eda/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index d50319f..39b34ee 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -276,7 +276,7 @@ public class DruidAdapterIT {
+ "from \"foodmart\"\n"
+ "offset 2 fetch next 3 rows only";
final String druidQuery = "{'queryType':'select','dataSource':'foodmart',"
- + "'descending':'false','intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+ + "'descending':false,'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+ "'dimensions':['state_province','product_name'],'metrics':[],'granularity':'all',"
+ "'pagingSpec':{'threshold':16384},'context':{'druid.query.fetch':false}}";
sql(sql)
@@ -288,7 +288,7 @@ public class DruidAdapterIT {
final String sql = "select \"gender\", \"state_province\"\n"
+ "from \"foodmart\" fetch next 3 rows only";
final String druidQuery = "{'queryType':'select','dataSource':'foodmart',"
- + "'descending':'false','intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+ + "'descending':false,'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+ "'dimensions':['gender','state_province'],'metrics':[],'granularity':'all',"
+ "'pagingSpec':{'threshold':3},'context':{'druid.query.fetch':true}}";
sql(sql)
@@ -321,7 +321,7 @@ public class DruidAdapterIT {
+ "where \"product_id\" BETWEEN 1500 AND 1502\n"
+ "order by \"state_province\" desc, \"product_id\"";
final String druidQuery = "{'queryType':'select','dataSource':'foodmart',"
- + "'descending':'false','intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+ + "'descending':false,'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+ "'filter':{'type':'and','fields':["
+ "{'type':'bound','dimension':'product_id','lower':'1500','lowerStrict':false,'alphaNumeric':false},"
+ "{'type':'bound','dimension':'product_id','upper':'1502','upperStrict':false,'alphaNumeric':false}]},"
@@ -366,7 +366,7 @@ public class DruidAdapterIT {
final String sql = "select * from \"foodmart\"\n"
+ "where \"product_id\" = -1";
final String druidQuery = "{'queryType':'select','dataSource':'foodmart',"
- + "'descending':'false','intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+ + "'descending':false,'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+ "'filter':{'type':'selector','dimension':'product_id','value':'-1'},"
+ "'dimensions':['product_id','brand_name','product_name','SKU','SRP',"
+ "'gross_weight','net_weight','recyclable_package','low_fat','units_per_case',"
@@ -401,7 +401,7 @@ public class DruidAdapterIT {
+ "where cast(\"product_id\" as integer) - 1500 BETWEEN 0 AND 2\n"
+ "order by \"state_province\" desc, \"product_id\"";
final String druidQuery = "{'queryType':'select','dataSource':'foodmart',"
- + "'descending':'false','intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+ + "'descending':false,'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+ "'dimensions':['product_id','brand_name','product_name','SKU','SRP','gross_weight',"
+ "'net_weight','recyclable_package','low_fat','units_per_case','cases_per_pallet',"
+ "'shelf_width','shelf_height','shelf_depth','product_class_id','product_subcategory',"
@@ -477,7 +477,7 @@ public class DruidAdapterIT {
@Test public void testCountGroupByEmpty() {
final String druidQuery = "{'queryType':'timeseries','dataSource':'foodmart',"
- + "'descending':'false','granularity':'all',"
+ + "'descending':false,'granularity':'all',"
+ "'aggregations':[{'type':'count','name':'EXPR$0'}],"
+ "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
final String explain = "PLAN=EnumerableInterpreter\n"
@@ -562,7 +562,7 @@ public class DruidAdapterIT {
+ "from \"foodmart\"\n"
+ "group by floor(\"timestamp\" to MONTH)";
String druidQuery = "{'queryType':'timeseries','dataSource':'foodmart',"
- + "'descending':'false','granularity':'MONTH',"
+ + "'descending':false,'granularity':'MONTH',"
+ "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'},"
+ "{'type':'count','name':'C','fieldName':'store_sqft'}],"
+ "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
@@ -578,7 +578,7 @@ public class DruidAdapterIT {
+ "from \"foodmart\"\n"
+ "group by floor(\"timestamp\" to DAY)";
String druidQuery = "{'queryType':'timeseries','dataSource':'foodmart',"
- + "'descending':'false','granularity':'DAY',"
+ + "'descending':false,'granularity':'DAY',"
+ "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'},"
+ "{'type':'count','name':'C','fieldName':'store_sqft'}],"
+ "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
@@ -596,7 +596,7 @@ public class DruidAdapterIT {
+ " \"timestamp\" < '1998-01-01 00:00:00'\n"
+ "group by floor(\"timestamp\" to MONTH)";
String druidQuery = "{'queryType':'timeseries','dataSource':'foodmart',"
- + "'descending':'false','granularity':'MONTH',"
+ + "'descending':false,'granularity':'MONTH',"
+ "'aggregations':[{'type':'longSum','name':'S','fieldName':'unit_sales'},"
+ "{'type':'count','name':'C','fieldName':'store_sqft'}],"
+ "'intervals':['1996-01-01T00:00:00.000Z/1998-01-01T00:00:00.000Z']}";
@@ -771,7 +771,7 @@ public class DruidAdapterIT {
+ "and \"state_province\" = 'WA'";
final String druidQuery = "{'queryType':'select',"
+ "'dataSource':'foodmart',"
- + "'descending':'false',"
+ + "'descending':false,"
+ "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
+ "'filter':{'type':'and','fields':["
+ "{'type':'selector','dimension':'product_name','value':'High Top Dried Mushrooms'},"