You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jc...@apache.org on 2017/04/11 12:43:22 UTC
[1/2] calcite git commit: [CALCITE-1725] Push project aggregate of
time extract to druid
Repository: calcite
Updated Branches:
refs/heads/master 35cd2abc8 -> b48f634fe
[CALCITE-1725] Push project aggregate of time extract to druid
* Fix 'testPushAggregateOnTime' test.
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/e76d5ee3
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/e76d5ee3
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/e76d5ee3
Branch: refs/heads/master
Commit: e76d5ee33a3cda0d9ce495e21681f5bede25f129
Parents: 35cd2ab
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Tue Apr 11 13:41:19 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Tue Apr 11 13:41:19 2017 +0100
----------------------------------------------------------------------
.../adapter/druid/DruidConnectionImpl.java | 21 ++++++++++++++------
.../calcite/adapter/druid/DruidQuery.java | 5 ++---
.../org/apache/calcite/test/DruidAdapterIT.java | 16 +++++++--------
3 files changed, 25 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/e76d5ee3/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
index c736098..a1975eb 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
@@ -270,7 +270,20 @@ class DruidConnectionImpl implements DruidConnection {
// Move to next token, which is name's value
JsonToken token = parser.nextToken();
- if (fieldName.equals(DEFAULT_RESPONSE_TIMESTAMP_COLUMN)) {
+
+ boolean isTimestampColumn = fieldName.equals(DEFAULT_RESPONSE_TIMESTAMP_COLUMN);
+ int i = fieldNames.indexOf(fieldName);
+ ColumnMetaData.Rep type = null;
+ if (i < 0) {
+ if (!isTimestampColumn) {
+ // Field not present
+ return;
+ }
+ } else {
+ type = fieldTypes.get(i);
+ }
+
+ if (isTimestampColumn || ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP.equals(type)) {
try {
final Date parse;
// synchronized block to avoid race condition
@@ -285,11 +298,7 @@ class DruidConnectionImpl implements DruidConnection {
}
return;
}
- int i = fieldNames.indexOf(fieldName);
- if (i < 0) {
- return;
- }
- ColumnMetaData.Rep type = fieldTypes.get(i);
+
switch (token) {
case VALUE_NUMBER_INT:
if (type == null) {
http://git-wip-us.apache.org/repos/asf/calcite/blob/e76d5ee3/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index a6c1fa9..1b81f67 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -1081,10 +1081,9 @@ public class DruidQuery extends AbstractRelNode implements BindableRel {
}
private ColumnMetaData.Rep getPrimitive(RelDataTypeField field) {
- if (field.getName().equals(query.druidTable.timestampFieldName)) {
- return ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP;
- }
switch (field.getType().getSqlTypeName()) {
+ case TIMESTAMP:
+ return ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP;
case BIGINT:
return ColumnMetaData.Rep.LONG;
case INTEGER:
http://git-wip-us.apache.org/repos/asf/calcite/blob/e76d5ee3/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index db6f8f8..904ecab 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -1506,18 +1506,18 @@ public class DruidAdapterIT {
}
@Test public void testPushAggregateOnTime() {
- String sql = "select \"product_id\", \"timestamp\" as \"time\" from \"foodmart\" where "
- + "\"product_id\" = 1016 and "
- + "\"timestamp\" < cast('1997-01-03' as timestamp) and \"timestamp\" > cast"
- + "('1990-01-01' as timestamp)" + " group by"
- + "\"timestamp\", \"product_id\" ";
+ String sql = "select \"product_id\", \"timestamp\" as \"time\" from \"foodmart\" "
+ + "where \"product_id\" = 1016 "
+ + "and \"timestamp\" < cast('1997-01-03' as timestamp) "
+ + "and \"timestamp\" > cast('1990-01-01' as timestamp) "
+ + "group by \"timestamp\", \"product_id\" ";
String druidQuery = "{'queryType':'groupBy','dataSource':'foodmart',"
+ "'granularity':'all','dimensions':[{'type':'extraction',"
- + "'dimension':'__time','outputName':'timestamp',"
+ + "'dimension':'__time','outputName':'extract_0',"
+ "'extractionFn':{'type':'timeFormat','format':'yyyy-MM-dd";
sql(sql)
- .returnsUnordered("product_id=1016; time=1997-01-02 00:00:00")
- .queryContains(druidChecker(druidQuery));
+ .queryContains(druidChecker(druidQuery))
+ .returnsUnordered("product_id=1016; time=1997-01-02 00:00:00");
}
@Test public void testPushAggregateOnTimeWithExtractYear() {
[2/2] calcite git commit: [CALCITE-1749] Push filter conditions
partially into Druid
Posted by jc...@apache.org.
[CALCITE-1749] Push filter conditions partially into Druid
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/b48f634f
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/b48f634f
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/b48f634f
Branch: refs/heads/master
Commit: b48f634fe0c90f68c7c047a387814557e5701fa7
Parents: e76d5ee
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Tue Apr 11 13:43:01 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Tue Apr 11 13:43:01 2017 +0100
----------------------------------------------------------------------
druid/pom.xml | 22 ++--
.../calcite/adapter/druid/DruidRules.java | 102 +++++++++++++------
.../org/apache/calcite/test/DruidAdapterIT.java | 28 +++++
3 files changed, 111 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/b48f634f/druid/pom.xml
----------------------------------------------------------------------
diff --git a/druid/pom.xml b/druid/pom.xml
index 7e433cd..362da10 100644
--- a/druid/pom.xml
+++ b/druid/pom.xml
@@ -57,6 +57,10 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
@@ -86,15 +90,15 @@ limitations under the License.
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>test</scope>
- </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
http://git-wip-us.apache.org/repos/asf/calcite/blob/b48f634f/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index 8f088bb..36e5b40 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -51,11 +51,15 @@ import org.apache.calcite.runtime.PredicateImpl;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.calcite.util.trace.CalciteTrace;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.commons.lang3.tuple.Triple;
+
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -158,11 +162,27 @@ public class DruidRules {
final Filter filter = call.rel(0);
final DruidQuery query = call.rel(1);
final RelOptCluster cluster = filter.getCluster();
+ final RelBuilder relBuilder = call.builder();
final RexBuilder rexBuilder = cluster.getRexBuilder();
- if (!DruidQuery.isValidSignature(query.signature() + 'f')
- || !query.isValidFilter(filter.getCondition())) {
+
+ if (!DruidQuery.isValidSignature(query.signature() + 'f')) {
return;
}
+
+ final List<RexNode> validPreds = new ArrayList<>();
+ final List<RexNode> nonValidPreds = new ArrayList<>();
+ final RexExecutor executor =
+ Util.first(cluster.getPlanner().getExecutor(), RexUtil.EXECUTOR);
+ final RexSimplify simplify = new RexSimplify(rexBuilder, true, executor);
+ final RexNode cond = simplify.simplify(filter.getCondition());
+ for (RexNode e : RelOptUtil.conjunctions(cond)) {
+ if (query.isValidFilter(e)) {
+ validPreds.add(e);
+ } else {
+ nonValidPreds.add(e);
+ }
+ }
+
// Timestamp
int timestampFieldIdx = -1;
for (int i = 0; i < query.getRowType().getFieldCount(); i++) {
@@ -172,71 +192,89 @@ public class DruidRules {
break;
}
}
- final RexExecutor executor =
- Util.first(cluster.getPlanner().getExecutor(), RexUtil.EXECUTOR);
- final RexSimplify simplify = new RexSimplify(rexBuilder, true, executor);
- final Pair<List<RexNode>, List<RexNode>> pair =
- splitFilters(rexBuilder, query,
- simplify.simplify(filter.getCondition()), timestampFieldIdx);
- if (pair == null) {
+
+ final Triple<List<RexNode>, List<RexNode>, List<RexNode>> triple =
+ splitFilters(rexBuilder, query, validPreds, nonValidPreds, timestampFieldIdx);
+ if (triple.getLeft().isEmpty() && triple.getMiddle().isEmpty()) {
// We can't push anything useful to Druid.
return;
}
+ final List<RexNode> residualPreds = new ArrayList<>(triple.getRight());
List<LocalInterval> intervals = null;
- if (!pair.left.isEmpty()) {
+ if (!triple.getLeft().isEmpty()) {
intervals = DruidDateTimeUtils.createInterval(
query.getRowType().getFieldList().get(timestampFieldIdx).getType(),
- RexUtil.composeConjunction(rexBuilder, pair.left, false));
+ RexUtil.composeConjunction(rexBuilder, triple.getLeft(), false));
if (intervals == null) {
// We can't push anything useful to Druid.
- return;
+ residualPreds.addAll(triple.getLeft());
}
}
- DruidQuery newDruidQuery = query;
- if (!pair.right.isEmpty()) {
+ if (intervals == null && triple.getMiddle().isEmpty()) {
+ // We can't push anything useful to Druid.
+ return;
+ }
+ RelNode newDruidQuery = query;
+ if (!triple.getMiddle().isEmpty()) {
final RelNode newFilter = filter.copy(filter.getTraitSet(), Util.last(query.rels),
- RexUtil.composeConjunction(rexBuilder, pair.right, false));
+ RexUtil.composeConjunction(rexBuilder, triple.getMiddle(), false));
newDruidQuery = DruidQuery.extendQuery(query, newFilter);
}
if (intervals != null) {
- newDruidQuery = DruidQuery.extendQuery(newDruidQuery, intervals);
+ newDruidQuery = DruidQuery.extendQuery((DruidQuery) newDruidQuery, intervals);
+ }
+ if (!residualPreds.isEmpty()) {
+ newDruidQuery = relBuilder
+ .push(newDruidQuery)
+ .filter(residualPreds)
+ .build();
}
call.transformTo(newDruidQuery);
}
- /** Splits the filter condition in two groups: those that filter on the timestamp column
- * and those that filter on other fields. */
- private static Pair<List<RexNode>, List<RexNode>> splitFilters(final RexBuilder rexBuilder,
- final DruidQuery input, RexNode cond, final int timestampFieldIdx) {
+ /**
+ * Given a list of conditions that contain Druid valid operations and
+ * a list that contains those that contain any non-supported operation,
+ * it outputs a triple with three different categories:
+ * 1-l) condition filters on the timestamp column,
+ * 2-m) condition filters that can be pushed to Druid,
+ * 3-r) condition filters that cannot be pushed to Druid.
+ */
+ private static Triple<List<RexNode>, List<RexNode>, List<RexNode>> splitFilters(
+ final RexBuilder rexBuilder, final DruidQuery input, final List<RexNode> validPreds,
+ final List<RexNode> nonValidPreds, final int timestampFieldIdx) {
final List<RexNode> timeRangeNodes = new ArrayList<>();
- final List<RexNode> otherNodes = new ArrayList<>();
- List<RexNode> conjs = RelOptUtil.conjunctions(cond);
- if (conjs.isEmpty()) {
- // We do not transform
- return null;
- }
+ final List<RexNode> pushableNodes = new ArrayList<>();
+ final List<RexNode> nonPushableNodes = new ArrayList<>(nonValidPreds);
// Number of columns with the dimensions and timestamp
- for (RexNode conj : conjs) {
+ for (RexNode conj : validPreds) {
final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor();
conj.accept(visitor);
if (visitor.inputPosReferenced.contains(timestampFieldIdx)) {
if (visitor.inputPosReferenced.size() != 1) {
// Complex predicate, transformation currently not supported
- return null;
+ nonPushableNodes.add(conj);
+ } else {
+ timeRangeNodes.add(conj);
}
- timeRangeNodes.add(conj);
} else {
+ boolean filterOnMetrics = false;
for (Integer i : visitor.inputPosReferenced) {
if (input.druidTable.metricFieldNames.contains(
input.getRowType().getFieldList().get(i).getName())) {
// Filter on metrics, not supported in Druid
- return null;
+ filterOnMetrics = true;
+ break;
}
}
- otherNodes.add(conj);
+ if (filterOnMetrics) {
+ nonPushableNodes.add(conj);
+ } else {
+ pushableNodes.add(conj);
+ }
}
}
- return Pair.of(timeRangeNodes, otherNodes);
+ return ImmutableTriple.of(timeRangeNodes, pushableNodes, nonPushableNodes);
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/b48f634f/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index 904ecab..60952a7 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -1689,6 +1689,34 @@ public class DruidAdapterIT {
.returnsUnordered("EXPR$0=02; dayOfMonth=1016", "EXPR$0=10; dayOfMonth=1016",
"EXPR$0=13; dayOfMonth=1016", "EXPR$0=16; dayOfMonth=1016");
}
+
+ @Test public void testPushComplexFilter() {
+ String sql = "select sum(\"store_sales\") from \"foodmart\" "
+ + "where EXTRACT( year from \"timestamp\") = 1997 and "
+ + "\"cases_per_pallet\" >= 8 and \"cases_per_pallet\" <= 10 and "
+ + "\"units_per_case\" < 15 ";
+ String druidQuery = "{'queryType':'select','dataSource':'foodmart','descending':false,"
+ + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+ + "'filter':{'type':'and',"
+ + "'fields':[{'type':'bound','dimension':'cases_per_pallet','lower':'8',"
+ + "'lowerStrict':false,'ordering':'numeric'},"
+ + "{'type':'bound','dimension':'cases_per_pallet','upper':'10','upperStrict':false,"
+ + "'ordering':'numeric'},{'type':'bound','dimension':'units_per_case','upper':'15',"
+ + "'upperStrict':true,'ordering':'numeric'}]},'dimensions':[],'metrics':['store_sales'],"
+ + "'granularity':'all','pagingSpec':{'threshold':16384,'fromNext':true},"
+ + "'context':{'druid.query.fetch':false}}";
+ sql(sql)
+ .queryContains(druidChecker(druidQuery))
+ .explainContains("PLAN=EnumerableInterpreter\n"
+ + " BindableAggregate(group=[{}], EXPR$0=[SUM($1)])\n"
+ + " BindableFilter(condition=[AND(>=(/INT(Reinterpret($0), 86400000), 1997-01-01), "
+ + "<(/INT(Reinterpret($0), 86400000), 1998-01-01))])\n"
+ + " DruidQuery(table=[[foodmart, foodmart]], "
+ + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], "
+ + "filter=[AND(>=(CAST($11):BIGINT, 8), <=(CAST($11):BIGINT, 10), "
+ + "<(CAST($10):BIGINT, 15))], projects=[[$0, $90]])\n")
+ .returnsUnordered("EXPR$0=75364.09998679161");
+ }
}
// End DruidAdapterIT.java