You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/13 02:56:24 UTC
[01/11] git commit: DRILL-692: Add Hive UDFs to Drill SQL operator
table.
Repository: incubator-drill
Updated Branches:
refs/heads/master cdc5daed5 -> 1b20b6e9e
DRILL-692: Add Hive UDFs to Drill SQL operator table.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e602b2a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e602b2a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e602b2a9
Branch: refs/heads/master
Commit: e602b2a9cd2e7550be4c2fd38e2b5942dc623d41
Parents: cdc5dae
Author: vkorukanti <ve...@gmail.com>
Authored: Sat May 10 12:24:55 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:38:56 2014 -0700
----------------------------------------------------------------------
.../templates/ObjectInspectorHelper.java | 10 ++-
.../exec/expr/ExpressionTreeMaterializer.java | 2 +-
.../fn/HiveFunctionImplementationRegistry.java | 17 ++++-
.../exec/planner/sql/DrillOperatorTable.java | 13 ++--
.../drill/exec/planner/sql/HiveUDFOperator.java | 70 ++++++++++++++++++++
.../drill/exec/physical/impl/TestHiveUDFs.java | 10 +--
.../record/ExpressionTreeMaterializerTest.java | 3 +-
.../resources/functions/hive/GenericUDF.json | 6 +-
.../drill/jdbc/test/TestHiveScalarUDFs.java | 57 ++++++++++++++++
9 files changed, 169 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java b/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
index 23b969d..22a9eb2 100644
--- a/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
@@ -70,7 +70,7 @@ public class ObjectInspectorHelper {
JType holderClass = TypeHelper.getHolderType(m, returnType, TypeProtos.DataMode.OPTIONAL);
block.assign(returnValueHolder, JExpr._new(holderClass));
- <#if entry.hiveType == "VARCHAR" || entry.hiveType == "STRING">
+ <#if entry.hiveType == "VARCHAR" || entry.hiveType == "STRING" || entry.hiveType == "BINARY">
block.assign(returnValueHolder.ref("buffer"),
m.directClass(io.netty.buffer.Unpooled.class.getCanonicalName())
.staticInvoke("wrappedBuffer")
@@ -160,6 +160,14 @@ public class ObjectInspectorHelper {
.invoke("setBytes").arg(JExpr.lit(0)).arg(data));
jc._else().assign(returnValueHolder.ref("start"), JExpr.lit(0));
jc._else().assign(returnValueHolder.ref("end"), data.ref("length"));
+ <#elseif entry.hiveType == "BINARY">
+
+ JVar data = jc._else().decl(m.directClass(byte[].class.getCanonicalName()), "data",
+ castedOI.invoke("getPrimitiveJavaObject").arg(returnValue));
+ jc._else().add(returnValueHolder.ref("buffer")
+ .invoke("setBytes").arg(JExpr.lit(0)).arg(data));
+ jc._else().assign(returnValueHolder.ref("start"), JExpr.lit(0));
+ jc._else().assign(returnValueHolder.ref("end"), data.ref("length"));
<#else>
jc._else().assign(returnValueHolder.ref("value"),
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index fc7fb6a..0267be3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -192,7 +192,7 @@ public class ExpressionTreeMaterializer {
return new HiveFuncHolderExpr(call.getName(), matchedHiveHolder, call.args, call.getPosition());
logFunctionResolutionError(errorCollector, call);
- return null;
+ return NullExpression.INSTANCE;
}
private void logFunctionResolutionError(ErrorCollector errorCollector, FunctionCall call) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/HiveFunctionImplementationRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/HiveFunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/HiveFunctionImplementationRegistry.java
index 634b24f..e5c890e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/HiveFunctionImplementationRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/HiveFunctionImplementationRegistry.java
@@ -76,9 +76,22 @@ public class HiveFunctionImplementationRegistry {
for(int i=0; i<names.length;i++){
methods.put(names[i], clazz);
+ if (!names[i].toLowerCase().equals(names[i])) {
+ // After the Optiq-Drill conversion of function calls, function names are in lowercase
+ // and we fail to find them in the map. Add a lowercase name entry.
+ methods.put(names[i].toLowerCase(), clazz);
+ }
}
}
+ public ArrayListMultimap<String, Class<? extends GenericUDF>> getGenericUDFs() {
+ return methodsGenericUDF;
+ }
+
+ public ArrayListMultimap<String, Class<? extends UDF>> getUDFs() {
+ return methodsUDF;
+ }
+
/**
* Find the UDF class for given function name and check if it accepts the given input argument
* types. If a match is found, create a holder and return
@@ -127,7 +140,7 @@ public class HiveFunctionImplementationRegistry {
nonDeterministicUDFs.contains(udfClazz));
} catch(IllegalAccessException | InstantiationException e) {
logger.debug("Failed to instantiate class", e);
- } catch(UDFArgumentException e) { /*ignore this*/ }
+ } catch(Exception e) { /*ignore this*/ }
return null;
}
@@ -147,7 +160,7 @@ public class HiveFunctionImplementationRegistry {
returnOI,
Types.optional(ObjectInspectorHelper.getDrillType(returnOI)),
nonDeterministicUDFs.contains(udfClazz));
- } catch(UDFArgumentException e) { /*ignore this*/ }
+ } catch(Exception e) { /*ignore this*/ }
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
index 7c8bce2..772b3b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.collect.Sets;
import org.apache.drill.exec.expr.fn.DrillFuncHolder;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.eigenbase.sql.SqlFunctionCategory;
@@ -33,7 +34,6 @@ import org.eigenbase.sql.fun.SqlStdOperatorTable;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
-import com.google.hive12.common.collect.Sets;
public class DrillOperatorTable extends SqlStdOperatorTable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillOperatorTable.class);
@@ -63,10 +63,15 @@ public class DrillOperatorTable extends SqlStdOperatorTable {
}
}
- // TODO: add hive functions.
- }
-
+ for (String name : Sets.union(
+ registry.getHiveRegistry().getGenericUDFs().asMap().keySet(),
+ registry.getHiveRegistry().getUDFs().asMap().keySet())) {
+ SqlOperator op = new HiveUDFOperator(name.toUpperCase());
+ operators.add(op);
+ opMap.put(name, op);
+ }
+ }
@Override
public void lookupOperatorOverloads(SqlIdentifier opName, SqlFunctionCategory category, SqlSyntax syntax, List<SqlOperator> operatorList) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/HiveUDFOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/HiveUDFOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/HiveUDFOperator.java
new file mode 100644
index 0000000..71860c3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/HiveUDFOperator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.sql;
+
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.sql.SqlCall;
+import org.eigenbase.sql.SqlCallBinding;
+import org.eigenbase.sql.SqlFunction;
+import org.eigenbase.sql.SqlFunctionCategory;
+import org.eigenbase.sql.SqlIdentifier;
+import org.eigenbase.sql.SqlOperandCountRange;
+import org.eigenbase.sql.SqlOperator;
+import org.eigenbase.sql.parser.SqlParserPos;
+import org.eigenbase.sql.type.SqlOperandCountRanges;
+import org.eigenbase.sql.type.SqlOperandTypeChecker;
+import org.eigenbase.sql.type.SqlTypeName;
+import org.eigenbase.sql.validate.SqlValidator;
+import org.eigenbase.sql.validate.SqlValidatorScope;
+
+public class HiveUDFOperator extends SqlFunction {
+
+ public HiveUDFOperator(String name) {
+ super(new SqlIdentifier(name, SqlParserPos.ZERO), DynamicReturnType.INSTANCE, null, new ArgChecker(), null,
+ SqlFunctionCategory.USER_DEFINED_FUNCTION);
+ }
+
+ @Override
+ public RelDataType deriveType(SqlValidator validator, SqlValidatorScope scope, SqlCall call) {
+ return validator.getTypeFactory().createSqlType(SqlTypeName.ANY);
+ }
+
+ /** Argument Checker for variable number of arguments */
+ public static class ArgChecker implements SqlOperandTypeChecker {
+
+ public static ArgChecker INSTANCE = new ArgChecker();
+
+ private SqlOperandCountRange range = SqlOperandCountRanges.any();
+
+ @Override
+ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
+ return true;
+ }
+
+ @Override
+ public SqlOperandCountRange getOperandCountRange() {
+ return range;
+ }
+
+ @Override
+ public String getAllowedSignatures(SqlOperator op, String opName) {
+ return opName + "(HiveUDF - Opaque)";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
index eabeda2..b2fa898 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
@@ -101,10 +101,8 @@ public class TestHiveUDFs extends ExecTest {
NullableVar16CharVector concatV = (NullableVar16CharVector) vv.next();
Float4Vector flt1V = (Float4Vector) vv.next();
NullableVar16CharVector format_numberV = (NullableVar16CharVector) vv.next();
-
- /* DRILL-425
NullableVar16CharVector nullableStr1V = ((NullableVar16CharVector) vv.next());
- NullableVar16CharVector upperNullableStr1V = ((NullableVar16CharVector) vv.next()); */
+ NullableVar16CharVector upperNullableStr1V = ((NullableVar16CharVector) vv.next());
for(int i=0; i<exec.getRecordCount(); i++) {
@@ -123,14 +121,12 @@ public class TestHiveUDFs extends ExecTest {
String nullableStr1 = null;
- /* DRILL-425
if (!nullableStr1V.getAccessor().isNull(i))
- nullableStr1 = new String(nullableStr1V.getAccessor().get(i), Charsets.UTF_16);*/
+ nullableStr1 = new String(nullableStr1V.getAccessor().get(i), Charsets.UTF_16);
String upperNullableStr1 = null;
- /* DRILL-425
if (!upperNullableStr1V.getAccessor().isNull(i))
- upperNullableStr1 = new String(upperNullableStr1V.getAccessor().get(i), Charsets.UTF_16); */
+ upperNullableStr1 = new String(upperNullableStr1V.getAccessor().get(i), Charsets.UTF_16);
assertEquals(nullableStr1 != null, upperNullableStr1 != null);
if (nullableStr1 != null)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
index 8a31a27..d07ce85 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
@@ -32,6 +32,7 @@ import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.IfExpression;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.TypedNullConstant;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -204,7 +205,7 @@ public class ExpressionTreeMaterializerTest extends ExecTest {
ImmutableList.of((LogicalExpression) new FieldReference("test", ExpressionPosition.UNKNOWN) ),
ExpressionPosition.UNKNOWN);
LogicalExpression newExpr = ExpressionTreeMaterializer.materialize(functionCallExpr, batch, ec, registry);
- assertEquals(newExpr, null);
+ assertTrue(newExpr instanceof TypedNullConstant);
assertEquals(1, ec.getErrorCount());
System.out.println(ec.toErrorString());
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/test/resources/functions/hive/GenericUDF.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/functions/hive/GenericUDF.json b/exec/java-exec/src/test/resources/functions/hive/GenericUDF.json
index 7b82570..e849e00 100644
--- a/exec/java-exec/src/test/resources/functions/hive/GenericUDF.json
+++ b/exec/java-exec/src/test/resources/functions/hive/GenericUDF.json
@@ -16,7 +16,7 @@
{name: "str1", type: "VAR16CHAR", mode: "REQUIRED"},
{name: "str2", type: "VAR16CHAR", mode: "REQUIRED"},
{name: "str3", type: "VAR16CHAR", mode: "REQUIRED"},
- /* DRILL-425 {name: "nullableStr1", type: "VAR16CHAR", mode: "OPTIONAL"}, */
+ {name: "nullableStr1", type: "VAR16CHAR", mode: "OPTIONAL"},
{name: "flt1", type: "FLOAT4", mode: "REQUIRED"}
]}
]
@@ -31,9 +31,9 @@
{ ref: "unix_timestamp", expr: "unix_timestamp()" },
{ ref: "concat", expr: "concat_ws('-', str2, str3)" },
{ ref: "flt1", expr: "flt1" },
- { ref: "format_number", expr: "format_number(cast(flt1 as float8), cast(2 as int))" } /* DRILL-425,
+ { ref: "format_number", expr: "format_number(cast(flt1 as float8), cast(2 as int))" },
{ ref: "nullableStr1", expr: "nullableStr1" },
- { ref: "upperNulableStr1", expr: "upper(nullableStr1)" } */
+ { ref: "upperNulableStr1", expr: "upper(nullableStr1)" }
]
},
{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestHiveScalarUDFs.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestHiveScalarUDFs.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestHiveScalarUDFs.java
new file mode 100644
index 0000000..76af1b0
--- /dev/null
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestHiveScalarUDFs.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.jdbc.test;
+
+
+import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestHiveScalarUDFs {
+
+ @BeforeClass
+ public static void generateHive() throws Exception{
+ new HiveTestDataGenerator().generateTestData();
+ }
+
+ /** Test a hive function that implements the interface {@link org.apache.hadoop.hive.ql.exec.UDF}. */
+ @Test
+ public void simpleUDF() throws Exception {
+ JdbcAssert.withNoDefaultSchema()
+ .sql("SELECT " +
+ "from_unixtime(1237573801) as unix_timestamp, " +
+ "UDFDegrees(cast(26.89 as DOUBLE)) as degrees " +
+ "FROM cp.`employee.json` LIMIT 1")
+ .returns("unix_timestamp=2009-03-20 11:30:01; degrees=1540.6835111067835");
+ }
+
+ /** Test a hive function that implements the interface {@link org.apache.hadoop.hive.ql.udf.generic.GenericUDF}. */
+ @Test
+ public void simpleGenericUDF() throws Exception{
+ JdbcAssert.withNoDefaultSchema()
+ .sql("SELECT CAST(" +
+ "encode('text', 'UTF-8') " +
+ "AS VARCHAR(5)) " +
+ "FROM cp.`employee.json` LIMIT 1")
+ .returns("EXPR$0=text");
+ }
+}
[02/11] git commit: DRILL-683: Qualify HBase scan with specified
columns even if row_key is required.
Posted by ja...@apache.org.
DRILL-683: Qualify HBase scan with specified columns even if row_key is required.
+ Added some log messages
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/49d53335
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/49d53335
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/49d53335
Branch: refs/heads/master
Commit: 49d533355e6cb98c623f68c3b90331c62ecbc270
Parents: e602b2a
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sat May 10 02:19:36 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:45:59 2014 -0700
----------------------------------------------------------------------
.../exec/store/hbase/DrillHBaseConstants.java | 2 ++
.../exec/store/hbase/HBaseRecordReader.java | 21 ++++++--------------
.../exec/store/hbase/HBaseScanBatchCreator.java | 7 +++----
.../store/hbase/HBaseStoragePluginConfig.java | 12 +++++++----
.../drill/exec/store/hbase/HBaseSubScan.java | 8 ++++----
...base_scan_screen_physical_column_select.json | 2 +-
6 files changed, 24 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
index 7969c45..a86797b 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
@@ -24,4 +24,6 @@ public interface DrillHBaseConstants {
static final SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
+ static final String HBASE_ZOOKEEPER_PORT = "hbase.zookeeper.property.clientPort";
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index af059f5..381cd6a 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -22,8 +22,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -43,6 +41,7 @@ import org.apache.drill.exec.vector.NullableVarBinaryVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
@@ -52,7 +51,6 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
@@ -70,7 +68,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
private Result leftOver;
private VarBinaryVector rowKeyVector;
private SchemaPath rowKeySchemaPath;
- private HTable table;
public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns, FragmentContext context) throws OutOfMemoryException {
@@ -110,15 +107,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
}
try {
- if (rowKeySchemaPath != null) {
- /* if ROW_KEY was requested, we can not qualify the scan with columns,
- * otherwise HBase will omit the entire row of all of the specified columns do
- * not exist for that row. Eventually we may want to use Family and/or Qualifier
- * Filters in such case but that would mean additional processing at server.
- */
- scan.setFamilyMap(new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR));
- }
-
Filter scanFilter = subScanSpec.getScanFilter();
if (rowKeyOnly) {
/* if only the row key was requested, add a FirstKeyOnlyFilter to the scan
@@ -134,12 +122,15 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
scan.setFilter(scanFilter);
scan.setCaching(TARGET_RECORD_COUNT);
- table = new HTable(conf, subScanSpec.getTableName());
+ logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.",
+ subScanSpec.getTableName(), conf.get(HConstants.ZOOKEEPER_QUORUM),
+ conf.get(HBASE_ZOOKEEPER_PORT), conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ HTable table = new HTable(conf, subScanSpec.getTableName());
resultScanner = table.getScanner(scan);
try {
table.close();
} catch (IOException e) {
- logger.warn("Failure while closing HBase table", e);
+ logger.warn("Failure while closing HBase table: " + subScanSpec.getTableName(), e);
}
} catch (IOException e1) {
throw new DrillRuntimeException(e1);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 0a4eabe..661e1b4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -25,7 +25,6 @@ import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.RecordReader;
-import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -37,11 +36,10 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
public RecordBatch getBatch(FragmentContext context, HBaseSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<RecordReader> readers = Lists.newArrayList();
- Configuration config = ((HBaseStoragePluginConfig) subScan.getStorageConfig()).getHBaseConf();
- for(HBaseSubScan.HBaseSubScanSpec e : subScan.getRegionScanSpecList()){
+ for(HBaseSubScan.HBaseSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
try {
readers.add(
- new HBaseRecordReader(config, e, subScan.getColumns(), context)
+ new HBaseRecordReader(subScan.getStorageConfig().getHBaseConf(), scanSpec, subScan.getColumns(), context)
);
} catch (Exception e1) {
throw new ExecutionSetupException(e1);
@@ -49,4 +47,5 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
}
return new ScanBatch(subScan, context, readers.iterator());
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
index b6ff069..5a434d6 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.hbase;
import org.apache.drill.common.logical.StoragePluginConfigBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -29,7 +30,8 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
@JsonTypeName("hbase")
-public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
+public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements DrillHBaseConstants {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseStoragePluginConfig.class);
@JsonProperty
public String zookeeperQuorum;
@@ -47,9 +49,11 @@ public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
this.zookeeperPort = zookeeperPort;
this.hbaseConf = HBaseConfiguration.create();
+ logger.debug("Configuring HBase StoragePlugin with zookeeper quorum '{}', port '{}' node '{}'.",
+ zookeeperQuorum, zookeeperPort, hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
if (zookeeperQuorum != null && zookeeperQuorum.length() != 0) {
- hbaseConf.set("hbase.zookeeper.quorum", zookeeperQuorum);
- hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
+ hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum);
+ hbaseConf.setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
}
this.hbaseConfKey = new HConnectionKey(hbaseConf);
}
@@ -79,7 +83,7 @@ public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
@VisibleForTesting
public void setZookeeperPort(int zookeeperPort) {
this.zookeeperPort = zookeeperPort;
- hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
+ hbaseConf.setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 6b87817..3f20087 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -48,7 +48,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class);
@JsonProperty
- public final StoragePluginConfig storage;
+ public final HBaseStoragePluginConfig storage;
@JsonIgnore
private final HBaseStoragePlugin hbaseStoragePlugin;
private final List<HBaseSubScanSpec> regionScanSpecList;
@@ -61,7 +61,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
@JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
this.regionScanSpecList = regionScanSpecList;
- this.storage = storage;
+ this.storage = (HBaseStoragePluginConfig) storage;
this.columns = columns;
}
@@ -78,7 +78,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
}
@JsonIgnore
- public StoragePluginConfig getStorageConfig() {
+ public HBaseStoragePluginConfig getStorageConfig() {
return storage;
}
@@ -114,7 +114,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
- return new HBaseSubScan(hbaseStoragePlugin, (HBaseStoragePluginConfig) storage, regionScanSpecList, columns);
+ return new HBaseSubScan(hbaseStoragePlugin, storage, regionScanSpecList, columns);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
index c64dc97..dc08031 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
@@ -19,7 +19,7 @@
"zookeeperPort" : 2181
},
columns: [
- "`f2`.c1", "`f2`.c2"
+ "`f2`.c1", "`f2`.c2", "row_key"
]
},
{
[05/11] git commit: DRILL-695: Push down column value predicates into
HBase scan
Posted by ja...@apache.org.
DRILL-695: Push down column value predicates into HBase scan
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fafb5761
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fafb5761
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fafb5761
Branch: refs/heads/master
Commit: fafb57615d4efba8a31516c66d86df5926d3595f
Parents: 6e6c661
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sat May 10 17:20:55 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:47:00 2014 -0700
----------------------------------------------------------------------
.../exec/store/hbase/HBaseFilterBuilder.java | 215 +++++++++++++++----
.../drill/exec/store/hbase/HBaseGroupScan.java | 23 ++
.../store/hbase/HBasePushFilterIntoScan.java | 34 ++-
.../exec/store/hbase/HBaseRecordReader.java | 15 +-
.../drill/exec/store/hbase/HBaseUtils.java | 71 +++++-
.../org/apache/drill/hbase/BaseHBaseTest.java | 11 +-
.../org/apache/drill/hbase/HBaseTestsSuite.java | 7 +-
.../drill/hbase/TestHBaseFilterPushDown.java | 45 +++-
.../drill/hbase/TestHBaseProjectPushDown.java | 2 +-
.../apache/drill/hbase/TestTableGenerator.java | 6 +-
10 files changed, 353 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
index b39ee1d..0e0ccf5 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
@@ -20,16 +20,20 @@ package org.apache.drill.exec.store.hbase;
import java.util.Arrays;
import org.apache.drill.common.expression.CastExpression;
-import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.ValueExpressions.QuotedString;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -37,41 +41,43 @@ import com.google.common.collect.ImmutableMap.Builder;
public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
- private static final ImmutableMap<String, String> RELATIONAL_FUNCTIONS_TRANSPOSE_MAP;
- static {
- Builder<String, String> builder = ImmutableMap.builder();
- RELATIONAL_FUNCTIONS_TRANSPOSE_MAP = builder
- .put("equal", "equal")
- .put("not_equal", "not_equal")
- .put("greater_than_or_equal_to", "less_than_or_equal_to")
- .put("greater_than", "less_than")
- .put("less_than_or_equal_to", "greater_than_or_equal_to")
- .put("less_than", "greater_than")
- .build();
+ final private HBaseGroupScan groupScan;
+
+ final private LogicalExpression le;
+
+ private boolean allExpressionsConverted = true;
+
+ HBaseFilterBuilder(HBaseGroupScan groupScan, LogicalExpression le) {
+ this.groupScan = groupScan;
+ this.le = le;
}
- private HBaseScanSpec scanSpec;
+ public HBaseScanSpec parseTree() {
+ return mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), le.accept(this, null));
+ }
- HBaseFilterBuilder(HBaseScanSpec hbaseScanSpec) {
- this.scanSpec = hbaseScanSpec;
+ public boolean isAllExpressionsConverted() {
+ return allExpressionsConverted;
}
- static HBaseScanSpec getHBaseScanSpec(HBaseScanSpec hbaseScanSpec, LogicalExpression e) {
- HBaseFilterBuilder filterBuilder = new HBaseFilterBuilder(hbaseScanSpec);
- return e.accept(filterBuilder, null);
+ @Override
+ public HBaseScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
+ allExpressionsConverted = false;
+ return null;
}
@Override
- public HBaseScanSpec visitFunctionCall(FunctionCall call, Void filterSet) throws RuntimeException {
+ public HBaseScanSpec visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
+ HBaseScanSpec nodeScanSpec = null;
String functionName = call.getName();
ImmutableList<LogicalExpression> args = call.args;
- if (args.size() == 2 && RELATIONAL_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)) {
+ if (COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)) {
LogicalExpression nameArg = args.get(0);
LogicalExpression valueArg = args.get(1);
if (nameArg instanceof QuotedString) {
valueArg = nameArg;
nameArg = args.get(1);
- functionName = RELATIONAL_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+ functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
}
while (nameArg instanceof CastExpression
@@ -79,54 +85,171 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
nameArg = ((CastExpression) nameArg).getInput();
}
- if (nameArg instanceof FieldReference
- && ((FieldReference) nameArg).getAsUnescapedPath().equals(ROW_KEY)
- && valueArg instanceof QuotedString) {
- return createHBaseScanSpec(functionName , ((QuotedString) valueArg).value.getBytes());
+ if (nameArg instanceof SchemaPath && valueArg instanceof QuotedString) {
+ nodeScanSpec = createHBaseScanSpec(functionName, (SchemaPath) nameArg, ((QuotedString) valueArg).value.getBytes());
+ }
+ } else {
+ switch (functionName) {
+ case "booleanAnd":
+ case "booleanOr":
+ HBaseScanSpec leftScanSpec = args.get(0).accept(this, null);
+ HBaseScanSpec rightScanSpec = args.get(1).accept(this, null);
+ if (leftScanSpec != null && rightScanSpec != null) {
+ nodeScanSpec = mergeScanSpecs(functionName, leftScanSpec, rightScanSpec);
+ } else {
+ allExpressionsConverted = false;
+ if ("booleanAnd".equals(functionName)) {
+ nodeScanSpec = leftScanSpec == null ? rightScanSpec : leftScanSpec;
+ }
+ }
+ break;
+ case "isnotnull":
+ case "isNotNull":
+ case "is not null":
+ case "isnull":
+ case "isNull":
+ case "is null":
+ /*
+ * HBASE-10848: Bug in HBase versions (0.94.[0-18], 0.96.[0-2], 0.98.[0-1])
+ * causes a filter with NullComparator to fail. Enable only if specified in
+ * the configuration (after ensuring that the HBase cluster has the fix).
+ */
+ if (groupScan.getHBaseConf().getBoolean("drill.hbase.supports.null.comparator", false)) {
+ if (args.get(0) instanceof SchemaPath) {
+ nodeScanSpec = createHBaseScanSpec(functionName, ((SchemaPath) args.get(0)), null);
+ }
+ }
}
}
- return null;
+
+ if (nodeScanSpec == null) {
+ allExpressionsConverted = false;
+ }
+ return nodeScanSpec;
+ }
+
+ private HBaseScanSpec mergeScanSpecs(String functionName, HBaseScanSpec leftScanSpec, HBaseScanSpec rightScanSpec) {
+ Filter newFilter = null;
+ byte[] startRow = HConstants.EMPTY_START_ROW;
+ byte[] stopRow = HConstants.EMPTY_END_ROW;
+
+ switch (functionName) {
+ case "booleanAnd":
+ newFilter = HBaseUtils.andFilterAtIndex(leftScanSpec.filter, HBaseUtils.LAST_FILTER, rightScanSpec.filter);
+ startRow = HBaseUtils.maxOfStartRows(leftScanSpec.startRow, rightScanSpec.startRow);
+ stopRow = HBaseUtils.minOfStopRows(leftScanSpec.stopRow, rightScanSpec.stopRow);
+ break;
+ case "booleanOr":
+ newFilter = HBaseUtils.orFilterAtIndex(leftScanSpec.filter, HBaseUtils.LAST_FILTER, rightScanSpec.filter);
+ startRow = HBaseUtils.minOfStartRows(leftScanSpec.startRow, rightScanSpec.startRow);
+ stopRow = HBaseUtils.maxOfStopRows(leftScanSpec.stopRow, rightScanSpec.stopRow);
+ }
+ return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, newFilter);
}
- private HBaseScanSpec createHBaseScanSpec(String functionName, byte[] value) {
- byte[] startRow = scanSpec.getStartRow();
- byte[] stopRow = scanSpec.getStopRow();
- Filter filter = null;
+ private HBaseScanSpec createHBaseScanSpec(String functionName, SchemaPath field, byte[] fieldValue) {
+ boolean isRowKey = field.getAsUnescapedPath().equals(ROW_KEY);
+ if (!(isRowKey
+ || (field.getRootSegment().getChild() != null && field.getRootSegment().getChild().isNamed()))) {
+ /*
+ * if the field in this function is neither the row_key nor a qualified HBase column, return.
+ */
+ return null;
+ }
+
+ CompareOp compareOp = null;
+ boolean isNullTest = false;
+ WritableByteArrayComparable comparator = new BinaryComparator(fieldValue);
+ byte[] startRow = HConstants.EMPTY_START_ROW;
+ byte[] stopRow = HConstants.EMPTY_END_ROW;
switch (functionName) {
case "equal":
- startRow = stopRow = value;
+ compareOp = CompareOp.EQUAL;
+ if (isRowKey) {
+ startRow = stopRow = fieldValue;
+ }
break;
case "not_equal":
- filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(value));
+ compareOp = CompareOp.NOT_EQUAL;
break;
case "greater_than_or_equal_to":
- startRow = value;
- filter = new RowFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(value));
+ compareOp = CompareOp.GREATER_OR_EQUAL;
+ if (isRowKey) {
+ startRow = fieldValue;
+ }
break;
case "greater_than":
- startRow = value;
- filter = new RowFilter(CompareOp.GREATER, new BinaryComparator(value));
+ compareOp = CompareOp.GREATER;
+ if (isRowKey) {
+ startRow = fieldValue;
+ }
break;
case "less_than_or_equal_to":
- stopRow = Arrays.copyOf(value, value.length+1); // stopRow should be just greater than 'value'
- filter = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(value));
+ compareOp = CompareOp.LESS_OR_EQUAL;
+ if (isRowKey) {
+ // stopRow should be just greater than 'value'
+ stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+ }
break;
case "less_than":
- stopRow = value;
- filter = new RowFilter(CompareOp.LESS, new BinaryComparator(value));
+ compareOp = CompareOp.LESS;
+ if (isRowKey) {
+ stopRow = fieldValue;
+ }
+ break;
+ case "isnull":
+ case "isNull":
+ case "is null":
+ if (isRowKey) {
+ return null;
+ }
+ isNullTest = true;
+ compareOp = CompareOp.EQUAL;
+ comparator = new NullComparator();
break;
- default:
+ case "isnotnull":
+ case "isNotNull":
+ case "is not null":
+ if (isRowKey) {
+ return null;
+ }
+ compareOp = CompareOp.NOT_EQUAL;
+ comparator = new NullComparator();
break;
}
- if (filter != null || startRow != scanSpec.getStartRow() || stopRow != scanSpec.getStopRow()) {
- return new HBaseScanSpec(scanSpec.getTableName(), startRow, stopRow, filter);
+
+ if (compareOp != null || startRow != HConstants.EMPTY_START_ROW || stopRow != HConstants.EMPTY_END_ROW) {
+ Filter filter = null;
+ if (isRowKey) {
+ if (compareOp != null) {
+ filter = new RowFilter(compareOp, comparator);
+ }
+ } else {
+ byte[] family = HBaseUtils.getBytes(field.getRootSegment().getPath());
+ byte[] qualifier = HBaseUtils.getBytes(field.getRootSegment().getChild().getNameSegment().getPath());
+ filter = new SingleColumnValueFilter(family, qualifier, compareOp, comparator);
+ ((SingleColumnValueFilter)filter).setLatestVersionOnly(true);
+ if (!isNullTest) {
+ ((SingleColumnValueFilter)filter).setFilterIfMissing(true);
+ }
+ }
+ return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter);
}
+ // else
return null;
}
- @Override
- public HBaseScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
- return null;
+ private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+ static {
+ Builder<String, String> builder = ImmutableMap.builder();
+ COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+ .put("equal", "equal")
+ .put("not_equal", "not_equal")
+ .put("greater_than_or_equal_to", "less_than_or_equal_to")
+ .put("greater_than", "less_than")
+ .put("less_than_or_equal_to", "greater_than_or_equal_to")
+ .put("less_than", "greater_than")
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index de60741..809aa86 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
@@ -74,6 +75,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
private NavigableMap<HRegionInfo,ServerName> regionsToScan;
private HTableDescriptor hTableDesc;
+ private boolean filterPushedDown = false;
+
@JsonCreator
public HBaseGroupScan(@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
@JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
@@ -258,6 +261,16 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
return storagePlugin;
}
+ @JsonIgnore
+ public Configuration getHBaseConf() {
+ return getStorageConfig().getHBaseConf();
+ }
+
+ @JsonIgnore
+ public String getTableName() {
+ return getHBaseScanSpec().getTableName();
+ }
+
@Override
public String getDigest() {
return toString();
@@ -290,4 +303,14 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
return true;
}
+ @JsonIgnore
+ public void setFilterPushedDown(boolean b) {
+ this.filterPushedDown = true;
+ }
+
+ @JsonIgnore
+ public boolean isFilterPushedDown() {
+ return filterPushedDown;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
index 50b2813..2b419d4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
@@ -19,16 +19,18 @@
package org.apache.drill.exec.store.hbase;
import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.FilterPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.rex.RexNode;
+import com.google.common.collect.ImmutableList;
+
public class HBasePushFilterIntoScan extends StoragePluginOptimizerRule {
public static final StoragePluginOptimizerRule INSTANCE = new HBasePushFilterIntoScan();
@@ -43,14 +45,36 @@ public class HBasePushFilterIntoScan extends StoragePluginOptimizerRule {
final RexNode condition = filter.getCondition();
HBaseGroupScan groupScan = (HBaseGroupScan)scan.getGroupScan();
+ if (groupScan.isFilterPushedDown()) {
+ /*
+ * The rule can get triggered again due to the transformed "scan => filter" sequence
+ * created by the earlier execution of this rule when we could not do a complete
+ * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
+ * this flag to not do a re-processing of the rule on the already transformed call.
+ */
+ return;
+ }
+
LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(), scan, condition);
- HBaseScanSpec newScanSpec = HBaseFilterBuilder.getHBaseScanSpec(groupScan.getHBaseScanSpec(), conditionExp);
+ HBaseFilterBuilder hbaseFilterBuilder = new HBaseFilterBuilder(groupScan, conditionExp);
+ HBaseScanSpec newScanSpec = hbaseFilterBuilder.parseTree();
if (newScanSpec == null) {
- return; //no filter pushdown ==> No transformation.
+ return; //no filter pushdown ==> No transformation.
}
- final GroupScan newGroupsScan = new HBaseGroupScan(groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns());
+
+ final HBaseGroupScan newGroupsScan = new HBaseGroupScan(groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns());
+ newGroupsScan.setFilterPushedDown(true);
+
final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
- call.transformTo(newScanPrel);
+ if (hbaseFilterBuilder.isAllExpressionsConverted()) {
+ /*
+ * Since we could convert the entire filter condition expression into an HBase filter,
+ * we can eliminate the filter operator altogether.
+ */
+ call.transformTo(newScanPrel);
+ } else {
+ call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of((RelNode)newScanPrel)));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 381cd6a..58285db 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -47,9 +47,6 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import com.google.common.base.Stopwatch;
@@ -107,19 +104,17 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
}
try {
- Filter scanFilter = subScanSpec.getScanFilter();
+ scan.setFilter(subScanSpec.getScanFilter());
if (rowKeyOnly) {
/* if only the row key was requested, add a FirstKeyOnlyFilter to the scan
* to fetch only one KV from each row. If a filter is already part of this
- * scan, add the FirstKeyOnlyFilter as the SECOND filter of a MUST_PASS_ALL
+ * scan, add the FirstKeyOnlyFilter as the LAST filter of a MUST_PASS_ALL
* FilterList.
*/
- Filter firstKeyFilter = new FirstKeyOnlyFilter();
- scanFilter = (scanFilter == null)
- ? firstKeyFilter
- : new FilterList(Operator.MUST_PASS_ALL, scanFilter, firstKeyFilter);
+ scan.setFilter(
+ HBaseUtils.andFilterAtIndex(scan.getFilter(), HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter())
+ );
}
- scan.setFilter(scanFilter);
scan.setCaching(TARGET_RECORD_COUNT);
logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.",
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java
index bebdda7..10a7e64 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java
@@ -23,17 +23,24 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.CharacterCodingException;
+import java.util.List;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.util.Bytes;
+import com.google.common.collect.Lists;
+
public class HBaseUtils {
static final ParseFilter FILTER_PARSEER = new ParseFilter();
+ static final int FIRST_FILTER = 0;
+ static final int LAST_FILTER = -1;
+
public static byte[] getBytes(String str) {
return str == null ? HConstants.EMPTY_BYTE_ARRAY : Bytes.toBytes(str);
}
@@ -41,7 +48,7 @@ public class HBaseUtils {
static Filter parseFilterString(String filterString) {
if (filterString == null) return null;
try {
- return FILTER_PARSEER.parseFilterString(filterString);
+ return FILTER_PARSEER.parseFilterString(filterString);
} catch (CharacterCodingException e) {
throw new DrillRuntimeException("Error parsing filter string: " + filterString, e);
}
@@ -66,4 +73,66 @@ public class HBaseUtils {
}
}
+ public static Filter andFilterAtIndex(Filter currentFilter, int index, Filter newFilter) {
+ if (currentFilter == null) {
+ return newFilter;
+ } else if (newFilter == null) {
+ return currentFilter;
+ }
+
+ List<Filter> allFilters = Lists.newArrayList();
+ if (currentFilter instanceof FilterList && ((FilterList)currentFilter).getOperator() == FilterList.Operator.MUST_PASS_ALL) {
+ allFilters.addAll(((FilterList)currentFilter).getFilters());
+ } else {
+ allFilters.add(currentFilter);
+ }
+ allFilters.add((index == LAST_FILTER ? allFilters.size() : index), newFilter);
+ return new FilterList(FilterList.Operator.MUST_PASS_ALL, allFilters);
+ }
+
+ public static Filter orFilterAtIndex(Filter currentFilter, int index, Filter newFilter) {
+ if (currentFilter == null) {
+ return newFilter;
+ } else if (newFilter == null) {
+ return currentFilter;
+ }
+
+ List<Filter> allFilters = Lists.newArrayList();
+ if (currentFilter instanceof FilterList && ((FilterList)currentFilter).getOperator() == FilterList.Operator.MUST_PASS_ONE) {
+ allFilters.addAll(((FilterList)currentFilter).getFilters());
+ } else {
+ allFilters.add(currentFilter);
+ }
+ allFilters.add((index == LAST_FILTER ? allFilters.size() : index), newFilter);
+ return new FilterList(FilterList.Operator.MUST_PASS_ONE, allFilters);
+ }
+
+ public static byte[] maxOfStartRows(byte[] left, byte[] right) {
+ if (left == null || left.length == 0 || right == null || right.length == 0) {
+ return (left == null || left.length == 0) ? right : left;
+ }
+ return Bytes.compareTo(left, right) > 0 ? left : right;
+ }
+
+ public static byte[] minOfStartRows(byte[] left, byte[] right) {
+ if (left == null || left.length == 0 || right == null || right.length == 0) {
+ return HConstants.EMPTY_BYTE_ARRAY;
+ }
+ return Bytes.compareTo(left, right) < 0 ? left : right;
+ }
+
+ public static byte[] maxOfStopRows(byte[] left, byte[] right) {
+ if (left == null || left.length == 0 || right == null || right.length == 0) {
+ return HConstants.EMPTY_BYTE_ARRAY;
+ }
+ return Bytes.compareTo(left, right) > 0 ? left : right;
+ }
+
+ public static byte[] minOfStopRows(byte[] left, byte[] right) {
+ if (left == null || left.length == 0 || right == null || right.length == 0) {
+ return (left == null || left.length == 0) ? right : left;
+ }
+ return Bytes.compareTo(left, right) < 0 ? left : right;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index a68cf70..f170025 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -87,7 +87,8 @@ public class BaseHBaseTest extends BaseTestQuery {
}
protected void runSQLVerifyCount(String sql, int expectedRowCount) throws Exception{
- sql = sql.replace("[TABLE_NAME]", HBaseTestsSuite.TEST_TABLE_1);
+ sql = canonizeSQL(sql);
+ System.out.println("Running query:\n" + sql);
List<QueryResultBatch> results = testSqlWithResults(sql);
printResultAndVerifyRowCount(results, expectedRowCount);
}
@@ -106,7 +107,13 @@ public class BaseHBaseTest extends BaseTestQuery {
result.release();
}
System.out.println("Total record count: " + rowCount);
- Assert.assertEquals(expectedRowCount, rowCount);
+ if (expectedRowCount != -1) {
+ Assert.assertEquals(expectedRowCount, rowCount);
+ }
+ }
+
+ protected String canonizeSQL(String sql) {
+ return sql.replace("[TABLE_NAME]", HBaseTestsSuite.TEST_TABLE_1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index 32d7aaa..0e9d1cf 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -124,7 +124,12 @@ public class HBaseTestsSuite {
}
private static void createTestTables() throws Exception {
- TestTableGenerator.generateHBaseDataset1(admin, TEST_TABLE_1, 2);
+ /*
+ * We are seeing some issues with (Drill) Filter operator if a group scan span
+ * multiple fragments. Hence the number of regions in the HBase table is set to 1.
+ * Will revert to multiple region once the issue is resolved.
+ */
+ TestTableGenerator.generateHBaseDataset1(admin, TEST_TABLE_1, 1);
}
private static void cleanupTestTables() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 6e64c4a..6dd418c 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -17,37 +17,64 @@
*/
package org.apache.drill.hbase;
+import org.junit.Ignore;
import org.junit.Test;
public class TestHBaseFilterPushDown extends BaseHBaseTest {
@Test
- public void testFilterPushDownRowKeyEqual() throws Exception{
+ public void testFilterPushDownRowKeyEqual() throws Exception {
runSQLVerifyCount("SELECT\n"
- + " tableName.*\n"
+ + " *\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName\n"
- + " WHERE tableName.row_key = 'b4'"
+ + "WHERE\n"
+ + " row_key = 'b4'"
, 1);
}
@Test
- public void testFilterPushDownRowKeyGreaterThan() throws Exception{
+ public void testFilterPushDownRowKeyGreaterThan() throws Exception {
runSQLVerifyCount("SELECT\n"
- + " tableName.*\n"
+ + " *\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName\n"
- + " WHERE tableName.row_key > 'b4'"
+ + "WHERE\n"
+ + " row_key > 'b4'"
, 2);
}
@Test
- public void testFilterPushDownRowKeyLessThanOrEqualTo() throws Exception{
+ public void testFilterPushDownMultiColumns() throws Exception {
runSQLVerifyCount("SELECT\n"
- + " tableName.*\n"
+ + " *\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName\n"
- + " WHERE 'b4' >= tableName.row_key"
+ + "WHERE\n"
+ + " (row_key >= 'b5' OR row_key <= 'a2') AND (f['c1'] >= '1' OR f['c1'] is null)"
+ , 4);
+ }
+
+ @Test
+ @Ignore("Until convert_from() functions are working.")
+ public void testFilterPushDownConvertExpression() throws Exception {
+ runSQLVerifyCount("SELECT\n"
+ + " *\n"
+ + "FROM\n"
+ + " hbase.`[TABLE_NAME]` tableName\n"
+ + "WHERE\n"
+ + " convert_from(row_key, 'INT_BE') > 12"
+ , -1);
+ }
+
+ @Test
+ public void testFilterPushDownRowKeyLessThanOrEqualTo() throws Exception {
+ runSQLVerifyCount("SELECT\n"
+ + " *\n"
+ + "FROM\n"
+ + " hbase.`[TABLE_NAME]` tableName\n"
+ + "WHERE\n"
+ + " 'b4' >= row_key"
, 4);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
index 0d91454..88194d5 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
@@ -43,7 +43,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
public void testRowKeyAndColumnPushDown() throws Exception{
setColumnWidth(9);
runSQLVerifyCount("SELECT\n"
- + "row_key, f['c1']*31 as `f[c1]*31`, f['c2'] as `f['c2']`, 5 as `5`, 'abc' as `'abc'`\n"
+ + "row_key, f['c1']*31 as `f['c1']*31`, f['c2'] as `f['c2']`, 5 as `5`, 'abc' as `'abc'`\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName"
, 6);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
index 487c306..3678c78 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
@@ -42,7 +42,11 @@ public class TestTableGenerator {
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor("f"));
desc.addFamily(new HColumnDescriptor("f2"));
- admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1));
+ if (numberRegions > 1) {
+ admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1));
+ } else {
+ admin.createTable(desc);
+ }
HTable table = new HTable(admin.getConfiguration(), tableName);
[07/11] git commit: DRILL-567: Maven RAT plugin - Ignore all files
inside a hidden directory ".*/**"
Posted by ja...@apache.org.
DRILL-567: Maven RAT plugin - Ignore all files inside a hidden directory ".*/**"
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8a290ea6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8a290ea6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8a290ea6
Branch: refs/heads/master
Commit: 8a290ea6fe873db446970293679ed0d81ee55911
Parents: fb973c6
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Thu Apr 24 14:32:54 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:48:17 2014 -0700
----------------------------------------------------------------------
pom.xml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8a290ea6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d135c56..f567b64 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,6 +136,7 @@
<exclude>**/*.iml</exclude>
<exclude>**/*.tdd</exclude>
<exclude>**/*.project</exclude>
+ <exclude>.*/**</exclude>
</excludes>
</configuration>
</plugin>
[08/11] git commit: DRILL-691: Function alias to be able to cast to
TIMESTAMPTZ data type
Posted by ja...@apache.org.
DRILL-691: Function alias to be able to cast to TIMESTAMPTZ data type
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/a50ab2b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/a50ab2b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/a50ab2b9
Branch: refs/heads/master
Commit: a50ab2b9bf2131ba44dd566343a1cac0d751319d
Parents: 8a290ea
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 11 18:18:48 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 12:13:06 2014 -0700
----------------------------------------------------------------------
exec/java-exec/src/main/codegen/data/Casts.tdd | 8 ++++----
.../src/main/codegen/templates/CastVarCharDate.java | 2 +-
2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a50ab2b9/exec/java-exec/src/main/codegen/data/Casts.tdd
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/data/Casts.tdd b/exec/java-exec/src/main/codegen/data/Casts.tdd
index 0dc0090..73cdede 100644
--- a/exec/java-exec/src/main/codegen/data/Casts.tdd
+++ b/exec/java-exec/src/main/codegen/data/Casts.tdd
@@ -57,10 +57,10 @@
{from: "TimeStampTZ", to: "Date", major: "Date"},
{from: "TimeStampTZ", to: "TimeStamp", major: "Date"},
- {from: "VarChar", to: "Date", major: "VarCharDate"},
- {from: "VarChar", to: "TimeStamp", major: "VarCharDate"},
- {from: "VarChar", to: "TimeStampTZ", major: "VarCharDate"},
- {from: "VarChar", to: "Time", major: "VarCharDate"},
+ {from: "VarChar", to: "Date", major: "VarCharDate", alias: "datetype"},
+ {from: "VarChar", to: "TimeStamp", major: "VarCharDate", alias: "timestamptype"},
+ {from: "VarChar", to: "TimeStampTZ", major: "VarCharDate", alias: "timestamptztype"},
+ {from: "VarChar", to: "Time", major: "VarCharDate", alias: "timetype"},
{from: "Date", to: "VarChar", major: "DateVarChar", bufferLength: "10"}
{from: "TimeStamp", to: "VarChar", major: "DateVarChar", bufferLength: "23"},
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a50ab2b9/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java b/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
index 249b555..5a3127a 100644
--- a/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
+++ b/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
@@ -41,7 +41,7 @@ import org.joda.time.DateMidnight;
import org.apache.drill.exec.expr.fn.impl.DateUtility;
@SuppressWarnings("unused")
-@FunctionTemplate(name = "cast${type.to?upper_case}", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.NULL_IF_NULL)
+@FunctionTemplate(names = {"cast${type.to?upper_case}", "${type.alias}"}, scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.NULL_IF_NULL)
public class Cast${type.from}To${type.to} implements DrillSimpleFunc {
@Param ${type.from}Holder in;
[03/11] git commit: DRILL-686: Verify the operands while extracting
projects that can be pushed into a scan
Posted by ja...@apache.org.
DRILL-686: Verify the operands while extracting projects that can be pushed into a scan
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/508cb5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/508cb5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/508cb5de
Branch: refs/heads/master
Commit: 508cb5dea9deb66661267abd174d54db99dc6bfa
Parents: 49d5333
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sat May 10 18:07:10 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:46:19 2014 -0700
----------------------------------------------------------------------
.../drill/exec/planner/physical/PrelUtil.java | 18 +++++++++++-------
1 file changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/508cb5de/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index e98b970..bdc8b31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -41,7 +41,6 @@ import org.eigenbase.rex.RexCall;
import org.eigenbase.rex.RexInputRef;
import org.eigenbase.rex.RexLiteral;
import org.eigenbase.rex.RexNode;
-import org.eigenbase.rex.RexOver;
import org.eigenbase.rex.RexVisitorImpl;
import com.beust.jcommander.internal.Lists;
@@ -144,12 +143,17 @@ public class PrelUtil {
@Override
public PathSegment visitCall(RexCall call) {
if ("ITEM".equals(call.getOperator().getName())) {
- return call.operands.get(0).accept(this)
- .cloneWithNewChild(convertLiteral((RexLiteral) call.operands.get(1)));
- }
- // else
- for (RexNode operand : call.operands) {
- addColumn(operand.accept(this));
+ PathSegment mapOrArray = call.operands.get(0).accept(this);
+ if (mapOrArray != null) {
+ if (call.operands.get(1) instanceof RexLiteral) {
+ return mapOrArray.cloneWithNewChild(convertLiteral((RexLiteral) call.operands.get(1)));
+ }
+ return mapOrArray;
+ }
+ } else {
+ for (RexNode operand : call.operands) {
+ addColumn(operand.accept(this));
+ }
}
return null;
}
[06/11] git commit: DRILL-529: 'atom' rule in Antlr grammar uses
wrong token for calculating ExpressionPosition
Posted by ja...@apache.org.
DRILL-529: 'atom' rule in Antlr grammar uses wrong token for calculating ExpressionPosition
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fb973c66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fb973c66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fb973c66
Branch: refs/heads/master
Commit: fb973c66eb538ba5e780fe1f6369f13fa242f1de
Parents: fafb576
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Tue Apr 15 16:59:47 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:47:57 2014 -0700
----------------------------------------------------------------------
.../org/apache/drill/common/expression/parser/ExprParser.g | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb973c66/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
----------------------------------------------------------------------
diff --git a/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g b/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
index 5ad7099..9737e5d 100644
--- a/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
+++ b/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
@@ -278,13 +278,13 @@ xorExpr returns [LogicalExpression e]
unaryExpr returns [LogicalExpression e]
: sign=(Plus|Minus)? Number {$e = ValueExpressions.getNumericExpression($sign.text, $Number.text, pos(($sign != null) ? $sign : $Number)); }
- | Minus atom {$e = FunctionCallFactory.createExpression("u-", pos($atom.start), $atom.e); }
- | Excl atom {$e= FunctionCallFactory.createExpression("!", pos($atom.start), $atom.e); }
+ | Minus atom {$e = FunctionCallFactory.createExpression("u-", pos($Minus), $atom.e); }
+ | Excl atom {$e= FunctionCallFactory.createExpression("!", pos($Excl), $atom.e); }
| atom {$e = $atom.e; }
;
atom returns [LogicalExpression e]
- : Bool {$e = new ValueExpressions.BooleanExpression($Bool.text, pos($atom.start)); }
+ : Bool {$e = new ValueExpressions.BooleanExpression($Bool.text, pos($Bool)); }
| lookup {$e = $lookup.e; }
;
[04/11] git commit: DRILL-696: Use standard HBase configuration
name/value in HBaseStoragePluginConfig
Posted by ja...@apache.org.
DRILL-696: Use standard HBase configuration name/value in HBaseStoragePluginConfig
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6e6c661f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6e6c661f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6e6c661f
Branch: refs/heads/master
Commit: 6e6c661fd8099fb7f385346ba38e2026c84fc67a
Parents: 508cb5d
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sun May 11 03:19:43 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:46:38 2014 -0700
----------------------------------------------------------------------
.../store/hbase/HBaseStoragePluginConfig.java | 52 +++++++++++---------
.../org/apache/drill/hbase/BaseHBaseTest.java | 2 +-
.../org/apache/drill/hbase/HBaseTestsSuite.java | 13 +++--
.../hbase/hbase_scan_screen_physical.json | 6 ++-
...base_scan_screen_physical_column_select.json | 6 ++-
...base_scan_screen_physical_family_select.json | 6 ++-
.../src/test/resources/storage-plugins.json | 39 ++-------------
distribution/src/resources/storage-plugins.json | 8 +--
8 files changed, 58 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
index 5a434d6..054d65f 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
@@ -17,45 +17,43 @@
*/
package org.apache.drill.exec.store.hbase;
+import java.util.Map;
+
import org.apache.drill.common.logical.StoragePluginConfigBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
@JsonTypeName("hbase")
public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements DrillHBaseConstants {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseStoragePluginConfig.class);
- @JsonProperty
- public String zookeeperQuorum;
-
- @JsonProperty
- public int zookeeperPort;
+ private Map<String, String> config;
+ @JsonIgnore
private Configuration hbaseConf;
- private HConnectionKey hbaseConfKey;
@JsonCreator
- public HBaseStoragePluginConfig(@JsonProperty("zookeeperQuorum") String zookeeperQuorum,
- @JsonProperty("zookeeperPort") int zookeeperPort) {
- this.zookeeperQuorum = zookeeperQuorum;
- this.zookeeperPort = zookeeperPort;
-
- this.hbaseConf = HBaseConfiguration.create();
- logger.debug("Configuring HBase StoragePlugin with zookeeper quorum '{}', port '{}' node '{}'.",
- zookeeperQuorum, zookeeperPort, hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
- if (zookeeperQuorum != null && zookeeperQuorum.length() != 0) {
- hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum);
- hbaseConf.setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
+ public HBaseStoragePluginConfig(@JsonProperty("config") Map<String, String> props) {
+ this.config = props;
+ if (config == null) {
+ config = Maps.newHashMap();
}
- this.hbaseConfKey = new HConnectionKey(hbaseConf);
+ logger.debug("Configuring HBase StoragePlugin with zookeeper quorum '{}', port '{}'.",
+ config.get(HConstants.ZOOKEEPER_QUORUM), config.get(HBASE_ZOOKEEPER_PORT));
+ }
+
+ @JsonProperty
+ public Map<String, String> getConfig() {
+ return ImmutableMap.copyOf(config);
}
@Override
@@ -66,24 +64,32 @@ public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements
return false;
}
HBaseStoragePluginConfig that = (HBaseStoragePluginConfig) o;
- return this.hbaseConfKey.equals(that.hbaseConfKey);
+ return config.equals(that.config);
}
@Override
public int hashCode() {
- return this.hbaseConfKey != null ? this.hbaseConfKey.hashCode() : 0;
+ return this.config != null ? this.config.hashCode() : 0;
}
@JsonIgnore
public Configuration getHBaseConf() {
+ if (hbaseConf == null) {
+ hbaseConf = HBaseConfiguration.create();
+ if (config != null) {
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ hbaseConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+ }
return hbaseConf;
}
@JsonIgnore
@VisibleForTesting
public void setZookeeperPort(int zookeeperPort) {
- this.zookeeperPort = zookeeperPort;
- hbaseConf.setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
+ this.config.put(HBASE_ZOOKEEPER_PORT, String.valueOf(zookeeperPort));
+ getHBaseConf().setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index 0753a4d..a68cf70 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -76,7 +76,7 @@ public class BaseHBaseTest extends BaseTestQuery {
protected String getPlanText(String planFile, String tableName) throws IOException {
return Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8)
- .replaceFirst("\"zookeeperPort\".*:.*\\d+", "\"zookeeperPort\" : " + HBaseTestsSuite.getZookeeperPort())
+ .replaceFirst("\"hbase\\.zookeeper\\.property\\.clientPort\".*:.*\\d+", "\"hbase.zookeeper.property.clientPort\" : " + HBaseTestsSuite.getZookeeperPort())
.replace("[TABLE_NAME]", tableName);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index 36c31b7..32d7aaa 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -21,8 +21,6 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -40,7 +38,8 @@ import org.junit.runners.Suite.SuiteClasses;
TestHBaseFilterPushDown.class,
TestHBaseProjectPushDown.class})
public class HBaseTestsSuite {
- private static final Log LOG = LogFactory.getLog(HBaseTestsSuite.class);
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseTestsSuite.class);
+
private static final boolean IS_DEBUG = ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0;
protected static final String TEST_TABLE_1 = "TestTable1";
@@ -70,11 +69,11 @@ public class HBaseTestsSuite {
}
if (manageHBaseCluster) {
- LOG.info("Starting HBase mini cluster.");
+ logger.info("Starting HBase mini cluster.");
UTIL = new HBaseTestingUtility(conf);
UTIL.startMiniCluster();
hbaseClusterCreated = true;
- LOG.info("HBase mini cluster started.");
+ logger.info("HBase mini cluster started. Zookeeper port: '{}'", getZookeeperPort());
}
admin = new HBaseAdmin(conf);
@@ -104,9 +103,9 @@ public class HBaseTestsSuite {
}
if (hbaseClusterCreated) {
- LOG.info("Shutting down HBase mini cluster.");
+ logger.info("Shutting down HBase mini cluster.");
UTIL.shutdownMiniCluster();
- LOG.info("HBase mini cluster stopped.");
+ logger.info("HBase mini cluster stopped.");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json
index 17e17e4..7f9015b 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json
@@ -15,8 +15,10 @@
storage:
{
"type":"hbase",
- "zookeeperQuorum" : "localhost",
- "zookeeperPort" : 2181
+ config : {
+ "hbase.zookeeper.quorum" : "localhost",
+ "hbase.zookeeper.property.clientPort" : 2181
+ }
}
},
{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
index dc08031..f399f6f 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
@@ -15,8 +15,10 @@
storage:
{
"type":"hbase",
- "zookeeperQuorum" : "localhost",
- "zookeeperPort" : 2181
+ config : {
+ "hbase.zookeeper.quorum" : "localhost",
+ "hbase.zookeeper.property.clientPort" : 2181
+ }
},
columns: [
"`f2`.c1", "`f2`.c2", "row_key"
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json
index ce027a0..0002164 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json
@@ -15,8 +15,10 @@
storage:
{
"type":"hbase",
- "zookeeperQuorum" : "localhost",
- "zookeeperPort" : 2181
+ config : {
+ "hbase.zookeeper.quorum" : "localhost",
+ "hbase.zookeeper.property.clientPort" : 2181
+ }
},
columns: [
"f2"
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/test/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/storage-plugins.json b/contrib/storage-hbase/src/test/resources/storage-plugins.json
index 160583a..0e93f7e 100644
--- a/contrib/storage-hbase/src/test/resources/storage-plugins.json
+++ b/contrib/storage-hbase/src/test/resources/storage-plugins.json
@@ -1,40 +1,11 @@
{
"storage":{
- dfs: {
- type: "file",
- connection: "file:///",
- formats: {
- "psv" : {
- type: "text",
- extensions: [ "tbl" ],
- delimiter: "|"
- },
- "csv" : {
- type: "text",
- extensions: [ "csv" ],
- delimiter: ","
- },
- "tsv" : {
- type: "text",
- extensions: [ "tsv" ],
- delimiter: "\t"
- },
- "parquet" : {
- type: "parquet"
- },
- "json" : {
- type: "json"
- }
- }
- },
- cp: {
- type: "file",
- connection: "classpath:///"
- },
hbase : {
type:"hbase",
- zookeeperQuorum : "localhost",
- zookeeperPort : 2181
+ config : {
+ "hbase.zookeeper.quorum" : "localhost",
+ "hbase.zookeeper.property.clientPort" : 2181
+ }
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/distribution/src/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/distribution/src/resources/storage-plugins.json b/distribution/src/resources/storage-plugins.json
index c3f4a4b..3b1cbd0 100644
--- a/distribution/src/resources/storage-plugins.json
+++ b/distribution/src/resources/storage-plugins.json
@@ -59,9 +59,11 @@
/*,
hbase : {
- type:"hbase",
- zookeeperQuorum : "localhost",
- zookeeperPort : 2181
+ type:"hbase",
+ config : {
+ "hbase.zookeeper.quorum" : "localhost",
+ "hbase.zookeeper.property.clientPort" : 2181
+ }
}
*/
}
[10/11] git commit: Disable multiphase aggregation.
Posted by ja...@apache.org.
Disable multiphase aggregation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f7b7b1a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f7b7b1a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f7b7b1a0
Branch: refs/heads/master
Commit: f7b7b1a0211bef4cf663546e1d32cfb2a384222f
Parents: e4d3c7c
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Mon May 12 17:54:01 2014 -0700
Committer: Aditya Kishore <ad...@maprtech.com>
Committed: Mon May 12 17:54:01 2014 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/planner/physical/PlannerSettings.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f7b7b1a0/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 0f694af..5bead30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -34,7 +34,7 @@ public class PlannerSettings implements FrameworkContext{
public static final OptionValidator STREAMAGG = new BooleanValidator("planner.enable_streamagg", true);
public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);
public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
- public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);
+ public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", false);
public OptionManager options = null;
[11/11] git commit: Enable class unloading in unit test.
Posted by ja...@apache.org.
Enable class unloading in unit test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1b20b6e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1b20b6e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1b20b6e9
Branch: refs/heads/master
Commit: 1b20b6e9e9f98dc7fea14e23f80f152b08971628
Parents: f7b7b1a
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Mon May 12 17:54:31 2014 -0700
Committer: Aditya Kishore <ad...@maprtech.com>
Committed: Mon May 12 17:54:31 2014 -0700
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1b20b6e9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f567b64..81054dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -259,7 +259,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration>
- <argLine>-Xms1g -Xmx2g -XX:MaxDirectMemorySize=13096M </argLine>
+ <argLine>-Xms1g -Xmx2g -XX:MaxDirectMemorySize=13096M -XX:+CMSClassUnloadingEnabled</argLine>
<forkCount>1</forkCount>
<reuseForks>true</reuseForks>
<additionalClasspathElements>
[09/11] git commit: DRILL-690: Generate 2 phase plans for Hash Aggr
and Streaming Aggr only if the aggr functions are SUM, MIN, MAX.
Posted by ja...@apache.org.
DRILL-690: Generate 2 phase plans for Hash Aggr and Streaming Aggr only if the aggr functions are SUM, MIN, MAX.
Added session options for enabling/disabling aggrs, joins and multiphase aggrs. Modified DrillRuleSet to populate the rules based on options rather than from static list. Added matches() implementation for join and aggr rules.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e4d3c7ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e4d3c7ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e4d3c7ca
Branch: refs/heads/master
Commit: e4d3c7ca9e2cc20defb5fae7c0373d982147cabb
Parents: a50ab2b
Author: Aman Sinha <as...@maprtech.com>
Authored: Sun May 11 17:57:09 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 12:19:22 2014 -0700
----------------------------------------------------------------------
.../exec/planner/logical/DrillRuleSets.java | 55 ++++++++++++----
.../exec/planner/physical/AggPruleBase.java | 18 ++++++
.../exec/planner/physical/HashAggPrule.java | 56 +++++++++++++---
.../exec/planner/physical/HashJoinPrule.java | 5 ++
.../exec/planner/physical/MergeJoinPrule.java | 5 ++
.../exec/planner/physical/PlannerSettings.java | 29 ++++++++-
.../drill/exec/planner/physical/PrelUtil.java | 5 ++
.../exec/planner/physical/StreamAggPrule.java | 67 ++++++++++++--------
.../drill/exec/planner/sql/DrillSqlWorker.java | 12 ++--
.../server/options/SystemOptionManager.java | 7 +-
10 files changed, 204 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 0d6da25..c07fee3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -17,22 +17,14 @@
*/
package org.apache.drill.exec.planner.logical;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import net.hydromatic.optiq.tools.RuleSet;
+import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.physical.*;
-import org.apache.drill.exec.planner.physical.FilterPrule;
-import org.apache.drill.exec.planner.physical.HashAggPrule;
-import org.apache.drill.exec.planner.physical.HashJoinPrule;
-import org.apache.drill.exec.planner.physical.LimitPrule;
-import org.apache.drill.exec.planner.physical.MergeJoinPrule;
-import org.apache.drill.exec.planner.physical.ProjectPrule;
-import org.apache.drill.exec.planner.physical.ScanPrule;
-import org.apache.drill.exec.planner.physical.ScreenPrule;
-import org.apache.drill.exec.planner.physical.SortConvertPrule;
-import org.apache.drill.exec.planner.physical.SortPrule;
-import org.apache.drill.exec.planner.physical.StreamAggPrule;
import org.eigenbase.rel.RelFactories;
import org.eigenbase.rel.rules.MergeProjectRule;
import org.eigenbase.rel.rules.PushFilterPastJoinRule;
@@ -102,6 +94,7 @@ public class DrillRuleSets {
MergeProjectRule.INSTANCE
));
+ /*
public static final RuleSet DRILL_PHYSICAL_MEM = new DrillRuleSet(ImmutableSet.of( //
// DrillScanRule.INSTANCE,
// DrillFilterRule.INSTANCE,
@@ -150,12 +143,50 @@ public class DrillRuleSets {
// PushJoinThroughJoinRule.LEFT, //
// PushSortPastProjectRule.INSTANCE, //
));
-
+*/
public static final RuleSet DRILL_PHYSICAL_DISK = new DrillRuleSet(ImmutableSet.of( //
ProjectPrule.INSTANCE
));
+ public static final RuleSet getPhysicalRules(QueryContext qcontext) {
+ List<RelOptRule> ruleList = new ArrayList<RelOptRule>();
+
+
+ ruleList.add(ConvertCountToDirectScan.AGG_ON_PROJ_ON_SCAN);
+ ruleList.add(ConvertCountToDirectScan.AGG_ON_SCAN);
+ ruleList.add(SortConvertPrule.INSTANCE);
+ ruleList.add(SortPrule.INSTANCE);
+ ruleList.add(ProjectPrule.INSTANCE);
+ ruleList.add(ScanPrule.INSTANCE);
+ ruleList.add(ScreenPrule.INSTANCE);
+ ruleList.add(ExpandConversionRule.INSTANCE);
+ ruleList.add(FilterPrule.INSTANCE);
+ ruleList.add(LimitPrule.INSTANCE);
+ ruleList.add(WriterPrule.INSTANCE);
+ ruleList.add(PushLimitToTopN.INSTANCE);
+
+ PlannerSettings ps = qcontext.getPlannerSettings();
+
+ if (ps.isHashAggEnabled()) {
+ ruleList.add(HashAggPrule.INSTANCE);
+ }
+
+ if (ps.isStreamAggEnabled()) {
+ ruleList.add(StreamAggPrule.INSTANCE);
+ }
+
+ if (ps.isHashJoinEnabled()) {
+ ruleList.add(HashJoinPrule.INSTANCE);
+ }
+
+ if (ps.isMergeJoinEnabled()) {
+ ruleList.add(MergeJoinPrule.INSTANCE);
+ }
+
+ return new DrillRuleSet(ImmutableSet.copyOf(ruleList));
+ }
+
public static RuleSet create(ImmutableSet<RelOptRule> rules) {
return new DrillRuleSet(rules);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index 235018d..3bdcc2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -24,7 +24,9 @@ import net.hydromatic.optiq.util.BitSets;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.eigenbase.rel.AggregateCall;
import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.relopt.RelOptRuleOperand;
import com.google.common.collect.Lists;
@@ -54,4 +56,20 @@ public abstract class AggPruleBase extends RelOptRule {
return groupByFields;
}
+ // Create 2 phase aggr plan for aggregates such as SUM, MIN, MAX
+ // If any of the aggregate functions are not one of these, then we
+ // currently won't generate a 2 phase plan.
+ protected boolean create2PhasePlan(RelOptRuleCall call, DrillAggregateRel aggregate) {
+ if (! PrelUtil.getPlannerSettings(call.getPlanner()).isMultiPhaseAggEnabled()) {
+ return false;
+ }
+
+ for (AggregateCall aggCall : aggregate.getAggCallList()) {
+ String name = aggCall.getAggregation().getName();
+ if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX"))) {
+ return false;
+ }
+ }
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 22b33ea..859941b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -27,6 +27,7 @@ import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
import org.eigenbase.trace.EigenbaseTrace;
import com.google.common.collect.ImmutableList;
@@ -40,6 +41,11 @@ public class HashAggPrule extends AggPruleBase {
}
@Override
+ public boolean matches(RelOptRuleCall call) {
+ return PrelUtil.getPlannerSettings(call.getPlanner()).isHashAggEnabled();
+ }
+
+ @Override
public void onMatch(RelOptRuleCall call) {
final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
final RelNode input = call.rel(1);
@@ -50,30 +56,60 @@ public class HashAggPrule extends AggPruleBase {
return;
}
- DrillDistributionTrait toDist = null;
RelTraitSet traits = null;
try {
if (aggregate.getGroupSet().isEmpty()) {
- toDist = DrillDistributionTrait.SINGLETON;
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+ DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
createTransformRequest(call, aggregate, input, traits);
} else {
// hash distribute on all grouping keys
- toDist = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
- ImmutableList.copyOf(getDistributionField(aggregate, true /* get all grouping keys */)));
+ DrillDistributionTrait distOnAllKeys =
+ new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionField(aggregate, true /* get all grouping keys */)));
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys);
createTransformRequest(call, aggregate, input, traits);
// hash distribute on single grouping key
- toDist = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
- ImmutableList.copyOf(getDistributionField(aggregate, false /* get single grouping key */)));
+ DrillDistributionTrait distOnOneKey =
+ new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionField(aggregate, false /* get single grouping key */)));
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnOneKey);
createTransformRequest(call, aggregate, input, traits);
- ///TODO: 2 phase hash aggregate plan
+ if (create2PhasePlan(call, aggregate)) {
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
+
+ RelNode convertedInput = convert(input, traits);
+
+ if (convertedInput instanceof RelSubset) {
+ RelSubset subset = (RelSubset) convertedInput;
+ for (RelNode rel : subset.getRelList()) {
+ if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
+ DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+ RelNode newInput = convert(input, traits);
+
+ HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList());
+
+ HashToRandomExchangePrel exch =
+ new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+ phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)));
+
+ HashAggPrel phase2Agg = new HashAggPrel(aggregate.getCluster(), traits, exch,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList());
+
+ call.transformTo(phase2Agg);
+ }
+ }
+ }
+ }
}
} catch (InvalidRelException e) {
tracer.warning(e.toString());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index 5e3ace2..851877f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -41,6 +41,11 @@ public class HashJoinPrule extends JoinPruleBase {
}
@Override
+ public boolean matches(RelOptRuleCall call) {
+ return PrelUtil.getPlannerSettings(call.getPlanner()).isHashJoinEnabled();
+ }
+
+ @Override
public void onMatch(RelOptRuleCall call) {
final DrillJoinRel join = (DrillJoinRel) call.rel(0);
final RelNode left = call.rel(1);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index 30e2a97..30f651c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -46,6 +46,11 @@ public class MergeJoinPrule extends JoinPruleBase {
}
@Override
+ public boolean matches(RelOptRuleCall call) {
+ return PrelUtil.getPlannerSettings(call.getPlanner()).isMergeJoinEnabled();
+ }
+
+ @Override
public void onMatch(RelOptRuleCall call) {
final DrillJoinRel join = (DrillJoinRel) call.rel(0);
final RelNode left = join.getLeft();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index ae895da..0f694af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -30,8 +30,13 @@ public class PlannerSettings implements FrameworkContext{
private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing
public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false);
-
- public OptionManager options;
+ public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg", true);
+ public static final OptionValidator STREAMAGG = new BooleanValidator("planner.enable_streamagg", true);
+ public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);
+ public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
+ public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);
+
+ public OptionManager options = null;
public PlannerSettings(OptionManager options){
this.options = options;
@@ -57,6 +62,26 @@ public class PlannerSettings implements FrameworkContext{
this.useDefaultCosting = defcost;
}
+ public boolean isHashAggEnabled() {
+ return options.getOption(HASHAGG.getOptionName()).bool_val;
+ }
+
+ public boolean isStreamAggEnabled() {
+ return options.getOption(STREAMAGG.getOptionName()).bool_val;
+ }
+
+ public boolean isHashJoinEnabled() {
+ return options.getOption(HASHJOIN.getOptionName()).bool_val;
+ }
+
+ public boolean isMergeJoinEnabled() {
+ return options.getOption(MERGEJOIN.getOptionName()).bool_val;
+ }
+
+ public boolean isMultiPhaseAggEnabled() {
+ return options.getOption(MULTIPHASE.getOptionName()).bool_val;
+ }
+
@Override
public <T> T unwrap(Class<T> clazz) {
if(clazz == PlannerSettings.class){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index bdc8b31..9ca9fbb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.eigenbase.rel.RelCollation;
import org.eigenbase.rel.RelFieldCollation;
import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptPlanner;
import org.eigenbase.reltype.RelDataType;
import org.eigenbase.rex.RexCall;
import org.eigenbase.rex.RexInputRef;
@@ -86,6 +87,10 @@ public class PrelUtil {
public static PlannerSettings getSettings(RelOptCluster cluster){
return cluster.getPlanner().getFrameworkContext().unwrap(PlannerSettings.class);
}
+
+ public static PlannerSettings getPlannerSettings(RelOptPlanner planner) {
+ return planner.getFrameworkContext().unwrap(PlannerSettings.class);
+ }
public static PhysicalOperator removeSvIfRequired(PhysicalOperator child, SelectionVectorMode... allowed){
SelectionVectorMode current = child.getSVMode();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index 62b9aa5..bccdea5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.eigenbase.rel.AggregateCall;
import org.eigenbase.rel.InvalidRelException;
import org.eigenbase.rel.RelCollation;
import org.eigenbase.rel.RelCollationImpl;
@@ -34,6 +35,7 @@ import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
import org.eigenbase.trace.EigenbaseTrace;
import com.google.common.collect.ImmutableList;
@@ -48,6 +50,11 @@ public class StreamAggPrule extends AggPruleBase {
}
@Override
+ public boolean matches(RelOptRuleCall call) {
+ return PrelUtil.getPlannerSettings(call.getPlanner()).isStreamAggEnabled();
+ }
+
+ @Override
public void onMatch(RelOptRuleCall call) {
final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
final RelNode input = aggregate.getChild();
@@ -82,31 +89,41 @@ public class StreamAggPrule extends AggPruleBase {
// might be causing some problem.
/// TODO: re-enable this plan after resolving the issue.
// createTransformRequest(call, aggregate, input, traits);
-
-
- /*
- // create a 2-phase plan - commented out for now until we resolve planning for 'ANY' distribution
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(DrillDistributionTrait.ANY);
-
- RelNode convertedInput = convert(input, traits);
- StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, convertedInput,
- aggregate.getGroupSet(),
- aggregate.getAggCallList());
-
- int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
-
- HashToMergeExchangePrel exch =
- new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
- phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
- collation,
- numEndPoints);
-
- StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), traits, exch,
- aggregate.getGroupSet(),
- aggregate.getAggCallList());
-
- call.transformTo(phase2Agg);
- */
+
+ if (create2PhasePlan(call, aggregate)) {
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
+
+ RelNode convertedInput = convert(input, traits);
+
+ if (convertedInput instanceof RelSubset) {
+ RelSubset subset = (RelSubset) convertedInput;
+ for (RelNode rel : subset.getRelList()) {
+ if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
+ DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(toDist);
+ RelNode newInput = convert(input, traits);
+
+ StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList());
+
+ int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
+
+ HashToMergeExchangePrel exch =
+ new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+ phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
+ collation,
+ numEndPoints);
+
+ StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), traits, exch,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList());
+
+ call.transformTo(phase2Agg);
+ }
+ }
+ }
+ }
}
} catch (InvalidRelException e) {
tracer.warning(e.toString());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index bd57785..7477440 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -86,20 +86,22 @@ public class DrillSqlWorker {
.traitDefs(traitDefs) //
.convertletTable(new DrillConvertletTable()) //
.context(context.getPlannerSettings()) //
- .ruleSets(getRules(context.getStorage())) //
+ .ruleSets(getRules(context)) //
.costFactory(costFactory) //
.build();
this.planner = Frameworks.getPlanner(config);
}
- private static RuleSet[] getRules(StoragePluginRegistry storagePluginRegistry) {
+ private static RuleSet[] getRules(QueryContext context) {
+ StoragePluginRegistry storagePluginRegistry = context.getStorage();
if (allRules == null) {
synchronized (DrillSqlWorker.class) {
if (allRules == null) {
- RuleSet dirllPhysicalMem = DrillRuleSets.mergedRuleSets(
- DrillRuleSets.DRILL_PHYSICAL_MEM, storagePluginRegistry.getStoragePluginRuleSet());
- allRules = new RuleSet[] {DrillRuleSets.DRILL_BASIC_RULES, dirllPhysicalMem};
+ RuleSet drillPhysicalMem = DrillRuleSets.mergedRuleSets(
+ DrillRuleSets.getPhysicalRules(context),
+ storagePluginRegistry.getStoragePluginRuleSet());
+ allRules = new RuleSet[] {DrillRuleSets.DRILL_BASIC_RULES, drillPhysicalMem};
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 98975e4..cfe8e2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -33,7 +33,12 @@ import com.google.common.collect.Maps;
public class SystemOptionManager implements OptionManager{
private final OptionValidator[] VALIDATORS = {
- PlannerSettings.EXCHANGE
+ PlannerSettings.EXCHANGE,
+ PlannerSettings.HASHAGG,
+ PlannerSettings.STREAMAGG,
+ PlannerSettings.HASHJOIN,
+ PlannerSettings.MERGEJOIN,
+ PlannerSettings.MULTIPHASE
};
private DistributedMap<OptionValue> options;