You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/13 02:56:24 UTC

[01/11] git commit: DRILL-692: Add Hive UDFs to Drill SQL operator table.

Repository: incubator-drill
Updated Branches:
  refs/heads/master cdc5daed5 -> 1b20b6e9e


DRILL-692: Add Hive UDFs to Drill SQL operator table.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e602b2a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e602b2a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e602b2a9

Branch: refs/heads/master
Commit: e602b2a9cd2e7550be4c2fd38e2b5942dc623d41
Parents: cdc5dae
Author: vkorukanti <ve...@gmail.com>
Authored: Sat May 10 12:24:55 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:38:56 2014 -0700

----------------------------------------------------------------------
 .../templates/ObjectInspectorHelper.java        | 10 ++-
 .../exec/expr/ExpressionTreeMaterializer.java   |  2 +-
 .../fn/HiveFunctionImplementationRegistry.java  | 17 ++++-
 .../exec/planner/sql/DrillOperatorTable.java    | 13 ++--
 .../drill/exec/planner/sql/HiveUDFOperator.java | 70 ++++++++++++++++++++
 .../drill/exec/physical/impl/TestHiveUDFs.java  | 10 +--
 .../record/ExpressionTreeMaterializerTest.java  |  3 +-
 .../resources/functions/hive/GenericUDF.json    |  6 +-
 .../drill/jdbc/test/TestHiveScalarUDFs.java     | 57 ++++++++++++++++
 9 files changed, 169 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java b/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
index 23b969d..22a9eb2 100644
--- a/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/ObjectInspectorHelper.java
@@ -70,7 +70,7 @@ public class ObjectInspectorHelper {
             JType holderClass = TypeHelper.getHolderType(m, returnType, TypeProtos.DataMode.OPTIONAL);
             block.assign(returnValueHolder, JExpr._new(holderClass));
 
-          <#if entry.hiveType == "VARCHAR" || entry.hiveType == "STRING">
+          <#if entry.hiveType == "VARCHAR" || entry.hiveType == "STRING" || entry.hiveType == "BINARY">
             block.assign(returnValueHolder.ref("buffer"),
               m.directClass(io.netty.buffer.Unpooled.class.getCanonicalName())
                 .staticInvoke("wrappedBuffer")
@@ -160,6 +160,14 @@ public class ObjectInspectorHelper {
               .invoke("setBytes").arg(JExpr.lit(0)).arg(data));
             jc._else().assign(returnValueHolder.ref("start"), JExpr.lit(0));
             jc._else().assign(returnValueHolder.ref("end"), data.ref("length"));
+          <#elseif entry.hiveType == "BINARY">
+
+            JVar data = jc._else().decl(m.directClass(byte[].class.getCanonicalName()), "data",
+              castedOI.invoke("getPrimitiveJavaObject").arg(returnValue));
+            jc._else().add(returnValueHolder.ref("buffer")
+                .invoke("setBytes").arg(JExpr.lit(0)).arg(data));
+            jc._else().assign(returnValueHolder.ref("start"), JExpr.lit(0));
+            jc._else().assign(returnValueHolder.ref("end"), data.ref("length"));
 
           <#else>
             jc._else().assign(returnValueHolder.ref("value"),

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index fc7fb6a..0267be3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -192,7 +192,7 @@ public class ExpressionTreeMaterializer {
         return new HiveFuncHolderExpr(call.getName(), matchedHiveHolder, call.args, call.getPosition());
 
       logFunctionResolutionError(errorCollector, call);
-      return null;
+      return NullExpression.INSTANCE;
     }
 
     private void logFunctionResolutionError(ErrorCollector errorCollector, FunctionCall call) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/HiveFunctionImplementationRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/HiveFunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/HiveFunctionImplementationRegistry.java
index 634b24f..e5c890e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/HiveFunctionImplementationRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/HiveFunctionImplementationRegistry.java
@@ -76,9 +76,22 @@ public class HiveFunctionImplementationRegistry {
 
     for(int i=0; i<names.length;i++){
       methods.put(names[i], clazz);
+      if (!names[i].toLowerCase().equals(names[i])) {
+        // After the Optiq-Drill conversion of function calls, function names are in lowercase
+        // and we fail to find them in the map. Add a lowercase name entry.
+        methods.put(names[i].toLowerCase(), clazz);
+      }
     }
   }
 
+  public ArrayListMultimap<String, Class<? extends GenericUDF>> getGenericUDFs() {
+    return methodsGenericUDF;
+  }
+
+  public ArrayListMultimap<String, Class<? extends UDF>> getUDFs() {
+    return methodsUDF;
+  }
+
   /**
    * Find the UDF class for given function name and check if it accepts the given input argument
    * types. If a match is found, create a holder and return
@@ -127,7 +140,7 @@ public class HiveFunctionImplementationRegistry {
         nonDeterministicUDFs.contains(udfClazz));
     } catch(IllegalAccessException | InstantiationException e) {
       logger.debug("Failed to instantiate class", e);
-    } catch(UDFArgumentException e) { /*ignore this*/ }
+    } catch(Exception e) { /*ignore this*/ }
 
     return null;
   }
@@ -147,7 +160,7 @@ public class HiveFunctionImplementationRegistry {
         returnOI,
         Types.optional(ObjectInspectorHelper.getDrillType(returnOI)),
         nonDeterministicUDFs.contains(udfClazz));
-    } catch(UDFArgumentException e) { /*ignore this*/ }
+    } catch(Exception e) { /*ignore this*/ }
 
     return null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
index 7c8bce2..772b3b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Sets;
 import org.apache.drill.exec.expr.fn.DrillFuncHolder;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.eigenbase.sql.SqlFunctionCategory;
@@ -33,7 +34,6 @@ import org.eigenbase.sql.fun.SqlStdOperatorTable;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
-import com.google.hive12.common.collect.Sets;
 
 public class DrillOperatorTable extends SqlStdOperatorTable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillOperatorTable.class);
@@ -63,10 +63,15 @@ public class DrillOperatorTable extends SqlStdOperatorTable {
       }
     }
 
-    // TODO: add hive functions.
-  }
-
+    for (String name : Sets.union(
+        registry.getHiveRegistry().getGenericUDFs().asMap().keySet(),
+        registry.getHiveRegistry().getUDFs().asMap().keySet())) {
 
+      SqlOperator op = new HiveUDFOperator(name.toUpperCase());
+      operators.add(op);
+      opMap.put(name, op);
+    }
+  }
 
   @Override
   public void lookupOperatorOverloads(SqlIdentifier opName, SqlFunctionCategory category, SqlSyntax syntax, List<SqlOperator> operatorList) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/HiveUDFOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/HiveUDFOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/HiveUDFOperator.java
new file mode 100644
index 0000000..71860c3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/HiveUDFOperator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.sql;
+
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.sql.SqlCall;
+import org.eigenbase.sql.SqlCallBinding;
+import org.eigenbase.sql.SqlFunction;
+import org.eigenbase.sql.SqlFunctionCategory;
+import org.eigenbase.sql.SqlIdentifier;
+import org.eigenbase.sql.SqlOperandCountRange;
+import org.eigenbase.sql.SqlOperator;
+import org.eigenbase.sql.parser.SqlParserPos;
+import org.eigenbase.sql.type.SqlOperandCountRanges;
+import org.eigenbase.sql.type.SqlOperandTypeChecker;
+import org.eigenbase.sql.type.SqlTypeName;
+import org.eigenbase.sql.validate.SqlValidator;
+import org.eigenbase.sql.validate.SqlValidatorScope;
+
+public class HiveUDFOperator extends SqlFunction {
+
+  public HiveUDFOperator(String name) {
+    super(new SqlIdentifier(name, SqlParserPos.ZERO), DynamicReturnType.INSTANCE, null, new ArgChecker(), null,
+        SqlFunctionCategory.USER_DEFINED_FUNCTION);
+  }
+
+  @Override
+  public RelDataType deriveType(SqlValidator validator, SqlValidatorScope scope, SqlCall call) {
+    return validator.getTypeFactory().createSqlType(SqlTypeName.ANY);
+  }
+
+  /** Argument Checker for variable number of arguments */
+  public static class ArgChecker implements SqlOperandTypeChecker {
+
+    public static ArgChecker INSTANCE = new ArgChecker();
+
+    private SqlOperandCountRange range = SqlOperandCountRanges.any();
+
+    @Override
+    public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
+      return true;
+    }
+
+    @Override
+    public SqlOperandCountRange getOperandCountRange() {
+      return range;
+    }
+
+    @Override
+    public String getAllowedSignatures(SqlOperator op, String opName) {
+      return opName + "(HiveUDF - Opaque)";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
index eabeda2..b2fa898 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHiveUDFs.java
@@ -101,10 +101,8 @@ public class TestHiveUDFs extends ExecTest {
       NullableVar16CharVector concatV = (NullableVar16CharVector) vv.next();
       Float4Vector flt1V = (Float4Vector) vv.next();
       NullableVar16CharVector format_numberV = (NullableVar16CharVector) vv.next();
-
-      /* DRILL-425
       NullableVar16CharVector nullableStr1V = ((NullableVar16CharVector) vv.next());
-      NullableVar16CharVector upperNullableStr1V = ((NullableVar16CharVector) vv.next()); */
+      NullableVar16CharVector upperNullableStr1V = ((NullableVar16CharVector) vv.next());
 
       for(int i=0; i<exec.getRecordCount(); i++) {
 
@@ -123,14 +121,12 @@ public class TestHiveUDFs extends ExecTest {
 
 
         String nullableStr1 = null;
-        /* DRILL-425
         if (!nullableStr1V.getAccessor().isNull(i))
-          nullableStr1 = new String(nullableStr1V.getAccessor().get(i), Charsets.UTF_16);*/
+          nullableStr1 = new String(nullableStr1V.getAccessor().get(i), Charsets.UTF_16);
 
         String upperNullableStr1 = null;
-        /* DRILL-425
         if (!upperNullableStr1V.getAccessor().isNull(i))
-          upperNullableStr1 = new String(upperNullableStr1V.getAccessor().get(i), Charsets.UTF_16); */
+          upperNullableStr1 = new String(upperNullableStr1V.getAccessor().get(i), Charsets.UTF_16);
 
         assertEquals(nullableStr1 != null, upperNullableStr1 != null);
         if (nullableStr1 != null)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
index 8a31a27..d07ce85 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
@@ -32,6 +32,7 @@ import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.IfExpression;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.TypedNullConstant;
 import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -204,7 +205,7 @@ public class ExpressionTreeMaterializerTest extends ExecTest {
       ImmutableList.of((LogicalExpression) new FieldReference("test", ExpressionPosition.UNKNOWN) ),
       ExpressionPosition.UNKNOWN);
     LogicalExpression newExpr = ExpressionTreeMaterializer.materialize(functionCallExpr, batch, ec, registry);
-    assertEquals(newExpr, null);
+    assertTrue(newExpr instanceof TypedNullConstant);
     assertEquals(1, ec.getErrorCount());
     System.out.println(ec.toErrorString());
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/java-exec/src/test/resources/functions/hive/GenericUDF.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/functions/hive/GenericUDF.json b/exec/java-exec/src/test/resources/functions/hive/GenericUDF.json
index 7b82570..e849e00 100644
--- a/exec/java-exec/src/test/resources/functions/hive/GenericUDF.json
+++ b/exec/java-exec/src/test/resources/functions/hive/GenericUDF.json
@@ -16,7 +16,7 @@
                    {name: "str1", type: "VAR16CHAR", mode: "REQUIRED"},
                    {name: "str2", type: "VAR16CHAR", mode: "REQUIRED"},
                    {name: "str3", type: "VAR16CHAR", mode: "REQUIRED"},
-                   /* DRILL-425 {name: "nullableStr1", type: "VAR16CHAR", mode: "OPTIONAL"}, */
+                   {name: "nullableStr1", type: "VAR16CHAR", mode: "OPTIONAL"},
                    {name: "flt1", type: "FLOAT4", mode: "REQUIRED"}
                 ]}
             ]
@@ -31,9 +31,9 @@
                 { ref: "unix_timestamp", expr: "unix_timestamp()" },
                 { ref: "concat", expr: "concat_ws('-', str2, str3)" },
                 { ref: "flt1", expr: "flt1" },
-                { ref: "format_number", expr: "format_number(cast(flt1 as float8), cast(2 as int))" } /* DRILL-425,
+                { ref: "format_number", expr: "format_number(cast(flt1 as float8), cast(2 as int))" },
                 { ref: "nullableStr1", expr: "nullableStr1" },
-                { ref: "upperNulableStr1", expr: "upper(nullableStr1)" } */
+                { ref: "upperNulableStr1", expr: "upper(nullableStr1)" }
             ]
         },
         {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e602b2a9/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestHiveScalarUDFs.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestHiveScalarUDFs.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestHiveScalarUDFs.java
new file mode 100644
index 0000000..76af1b0
--- /dev/null
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestHiveScalarUDFs.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.jdbc.test;
+
+
+import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestHiveScalarUDFs {
+
+  @BeforeClass
+  public static void generateHive() throws Exception{
+    new HiveTestDataGenerator().generateTestData();
+  }
+
+  /** Test a hive function that implements the interface {@link org.apache.hadoop.hive.ql.exec.UDF}. */
+  @Test
+  public void simpleUDF() throws Exception {
+    JdbcAssert.withNoDefaultSchema()
+        .sql("SELECT " +
+            "from_unixtime(1237573801) as unix_timestamp, " +
+            "UDFDegrees(cast(26.89 as DOUBLE)) as degrees " +
+            "FROM cp.`employee.json` LIMIT 1")
+        .returns("unix_timestamp=2009-03-20 11:30:01; degrees=1540.6835111067835");
+  }
+
+  /** Test a hive function that implements the interface {@link org.apache.hadoop.hive.ql.udf.generic.GenericUDF}. */
+  @Test
+  public void simpleGenericUDF() throws Exception{
+    JdbcAssert.withNoDefaultSchema()
+        .sql("SELECT CAST(" +
+            "encode('text', 'UTF-8') " +
+            "AS VARCHAR(5)) " +
+            "FROM cp.`employee.json` LIMIT 1")
+        .returns("EXPR$0=text");
+  }
+}


[02/11] git commit: DRILL-683: Qualify HBase scan with specified columns even if row_key is required.

Posted by ja...@apache.org.
DRILL-683: Qualify HBase scan with specified columns even if row_key is required.

 + Added some log messages


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/49d53335
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/49d53335
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/49d53335

Branch: refs/heads/master
Commit: 49d533355e6cb98c623f68c3b90331c62ecbc270
Parents: e602b2a
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sat May 10 02:19:36 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:45:59 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/DrillHBaseConstants.java   |  2 ++
 .../exec/store/hbase/HBaseRecordReader.java     | 21 ++++++--------------
 .../exec/store/hbase/HBaseScanBatchCreator.java |  7 +++----
 .../store/hbase/HBaseStoragePluginConfig.java   | 12 +++++++----
 .../drill/exec/store/hbase/HBaseSubScan.java    |  8 ++++----
 ...base_scan_screen_physical_column_select.json |  2 +-
 6 files changed, 24 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
index 7969c45..a86797b 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java
@@ -24,4 +24,6 @@ public interface DrillHBaseConstants {
 
   static final SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
 
+  static final String HBASE_ZOOKEEPER_PORT = "hbase.zookeeper.property.clientPort";
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index af059f5..381cd6a 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -22,8 +22,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -43,6 +41,7 @@ import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
@@ -52,7 +51,6 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FilterList.Operator;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
@@ -70,7 +68,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
   private Result leftOver;
   private VarBinaryVector rowKeyVector;
   private SchemaPath rowKeySchemaPath;
-  private HTable table;
 
   public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec,
       List<SchemaPath> projectedColumns, FragmentContext context) throws OutOfMemoryException {
@@ -110,15 +107,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
     }
 
     try {
-      if (rowKeySchemaPath != null) {
-        /* if ROW_KEY was requested, we can not qualify the scan with columns,
-         * otherwise HBase will omit the entire row of all of the specified columns do
-         * not exist for that row. Eventually we may want to use Family and/or Qualifier
-         * Filters in such case but that would mean additional processing at server.
-         */
-        scan.setFamilyMap(new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR));
-      }
-
       Filter scanFilter = subScanSpec.getScanFilter();
       if (rowKeyOnly) {
         /* if only the row key was requested, add a FirstKeyOnlyFilter to the scan
@@ -134,12 +122,15 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
       scan.setFilter(scanFilter);
       scan.setCaching(TARGET_RECORD_COUNT);
 
-      table = new HTable(conf, subScanSpec.getTableName());
+      logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.",
+                   subScanSpec.getTableName(), conf.get(HConstants.ZOOKEEPER_QUORUM),
+                   conf.get(HBASE_ZOOKEEPER_PORT), conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+      HTable table = new HTable(conf, subScanSpec.getTableName());
       resultScanner = table.getScanner(scan);
       try {
         table.close();
       } catch (IOException e) {
-        logger.warn("Failure while closing HBase table", e);
+        logger.warn("Failure while closing HBase table: " + subScanSpec.getTableName(), e);
       }
     } catch (IOException e1) {
       throw new DrillRuntimeException(e1);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 0a4eabe..661e1b4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -25,7 +25,6 @@ import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
-import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -37,11 +36,10 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
   public RecordBatch getBatch(FragmentContext context, HBaseSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();
-    Configuration config = ((HBaseStoragePluginConfig) subScan.getStorageConfig()).getHBaseConf();
-    for(HBaseSubScan.HBaseSubScanSpec e : subScan.getRegionScanSpecList()){
+    for(HBaseSubScan.HBaseSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
       try {
         readers.add(
-            new HBaseRecordReader(config, e, subScan.getColumns(), context)
+            new HBaseRecordReader(subScan.getStorageConfig().getHBaseConf(), scanSpec, subScan.getColumns(), context)
         );
       } catch (Exception e1) {
         throw new ExecutionSetupException(e1);
@@ -49,4 +47,5 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
     }
     return new ScanBatch(subScan, context, readers.iterator());
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
index b6ff069..5a434d6 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.hbase;
 import org.apache.drill.common.logical.StoragePluginConfigBase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -29,7 +30,8 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.annotations.VisibleForTesting;
 
 @JsonTypeName("hbase")
-public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
+public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements DrillHBaseConstants {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseStoragePluginConfig.class);
 
   @JsonProperty
   public String zookeeperQuorum;
@@ -47,9 +49,11 @@ public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
     this.zookeeperPort = zookeeperPort;
 
     this.hbaseConf = HBaseConfiguration.create();
+    logger.debug("Configuring HBase StoragePlugin with zookeeper quorum '{}', port '{}' node '{}'.",
+        zookeeperQuorum, zookeeperPort, hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
     if (zookeeperQuorum != null && zookeeperQuorum.length() != 0) {
-      hbaseConf.set("hbase.zookeeper.quorum", zookeeperQuorum);
-      hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
+      hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum);
+      hbaseConf.setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
     }
     this.hbaseConfKey = new HConnectionKey(hbaseConf);
   }
@@ -79,7 +83,7 @@ public class HBaseStoragePluginConfig extends StoragePluginConfigBase {
   @VisibleForTesting
   public void setZookeeperPort(int zookeeperPort) {
     this.zookeeperPort = zookeeperPort;
-    hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
+    hbaseConf.setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 6b87817..3f20087 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -48,7 +48,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class);
 
   @JsonProperty
-  public final StoragePluginConfig storage;
+  public final HBaseStoragePluginConfig storage;
   @JsonIgnore
   private final HBaseStoragePlugin hbaseStoragePlugin;
   private final List<HBaseSubScanSpec> regionScanSpecList;
@@ -61,7 +61,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
                       @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
     hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
     this.regionScanSpecList = regionScanSpecList;
-    this.storage = storage;
+    this.storage = (HBaseStoragePluginConfig) storage;
     this.columns = columns;
   }
 
@@ -78,7 +78,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
   }
 
   @JsonIgnore
-  public StoragePluginConfig getStorageConfig() {
+  public HBaseStoragePluginConfig getStorageConfig() {
     return storage;
   }
 
@@ -114,7 +114,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new HBaseSubScan(hbaseStoragePlugin, (HBaseStoragePluginConfig) storage, regionScanSpecList, columns);
+    return new HBaseSubScan(hbaseStoragePlugin, storage, regionScanSpecList, columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/49d53335/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
index c64dc97..dc08031 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
@@ -19,7 +19,7 @@
       "zookeeperPort" : 2181
     },
     columns: [
-      "`f2`.c1", "`f2`.c2"
+      "`f2`.c1", "`f2`.c2", "row_key"
     ]
   },
   {


[05/11] git commit: DRILL-695: Push down column value predicates into HBase scan

Posted by ja...@apache.org.
DRILL-695: Push down column value predicates into HBase scan


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fafb5761
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fafb5761
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fafb5761

Branch: refs/heads/master
Commit: fafb57615d4efba8a31516c66d86df5926d3595f
Parents: 6e6c661
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sat May 10 17:20:55 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:47:00 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseFilterBuilder.java    | 215 +++++++++++++++----
 .../drill/exec/store/hbase/HBaseGroupScan.java  |  23 ++
 .../store/hbase/HBasePushFilterIntoScan.java    |  34 ++-
 .../exec/store/hbase/HBaseRecordReader.java     |  15 +-
 .../drill/exec/store/hbase/HBaseUtils.java      |  71 +++++-
 .../org/apache/drill/hbase/BaseHBaseTest.java   |  11 +-
 .../org/apache/drill/hbase/HBaseTestsSuite.java |   7 +-
 .../drill/hbase/TestHBaseFilterPushDown.java    |  45 +++-
 .../drill/hbase/TestHBaseProjectPushDown.java   |   2 +-
 .../apache/drill/hbase/TestTableGenerator.java  |   6 +-
 10 files changed, 353 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
index b39ee1d..0e0ccf5 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
@@ -20,16 +20,20 @@ package org.apache.drill.exec.store.hbase;
 import java.util.Arrays;
 
 import org.apache.drill.common.expression.CastExpression;
-import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.ValueExpressions.QuotedString;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.NullComparator;
 import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -37,41 +41,43 @@ import com.google.common.collect.ImmutableMap.Builder;
 
 public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
 
-  private static final ImmutableMap<String, String> RELATIONAL_FUNCTIONS_TRANSPOSE_MAP;
-  static {
-   Builder<String, String> builder = ImmutableMap.builder();
-   RELATIONAL_FUNCTIONS_TRANSPOSE_MAP = builder
-       .put("equal", "equal")
-       .put("not_equal", "not_equal")
-       .put("greater_than_or_equal_to", "less_than_or_equal_to")
-       .put("greater_than", "less_than")
-       .put("less_than_or_equal_to", "greater_than_or_equal_to")
-       .put("less_than", "greater_than")
-       .build();
+  final private HBaseGroupScan groupScan;
+
+  final private LogicalExpression le;
+
+  private boolean allExpressionsConverted = true;
+
+  HBaseFilterBuilder(HBaseGroupScan groupScan, LogicalExpression le) {
+    this.groupScan = groupScan;
+    this.le = le;
   }
 
-  private HBaseScanSpec scanSpec;
+  public HBaseScanSpec parseTree() {
+    return mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), le.accept(this, null));
+  }
 
-  HBaseFilterBuilder(HBaseScanSpec hbaseScanSpec) {
-    this.scanSpec = hbaseScanSpec;
+  public boolean isAllExpressionsConverted() {
+    return allExpressionsConverted;
   }
 
-  static HBaseScanSpec getHBaseScanSpec(HBaseScanSpec hbaseScanSpec, LogicalExpression e) {
-    HBaseFilterBuilder filterBuilder = new HBaseFilterBuilder(hbaseScanSpec);
-    return e.accept(filterBuilder, null);
+  @Override
+  public HBaseScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
+    allExpressionsConverted = false;
+    return null;
   }
 
   @Override
-  public HBaseScanSpec visitFunctionCall(FunctionCall call, Void filterSet) throws RuntimeException {
+  public HBaseScanSpec visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
+    HBaseScanSpec nodeScanSpec = null;
     String functionName = call.getName();
     ImmutableList<LogicalExpression> args = call.args;
-    if (args.size() == 2 && RELATIONAL_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)) {
+    if (COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)) {
       LogicalExpression nameArg = args.get(0);
       LogicalExpression valueArg = args.get(1);
       if (nameArg instanceof QuotedString) {
         valueArg = nameArg;
         nameArg = args.get(1);
-        functionName = RELATIONAL_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+        functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
       }
 
       while (nameArg instanceof CastExpression
@@ -79,54 +85,171 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
         nameArg = ((CastExpression) nameArg).getInput();
       }
 
-      if (nameArg instanceof FieldReference
-          && ((FieldReference) nameArg).getAsUnescapedPath().equals(ROW_KEY)
-          && valueArg instanceof QuotedString) {
-        return createHBaseScanSpec(functionName , ((QuotedString) valueArg).value.getBytes());
+      if (nameArg instanceof SchemaPath && valueArg instanceof QuotedString) {
+        nodeScanSpec = createHBaseScanSpec(functionName, (SchemaPath) nameArg, ((QuotedString) valueArg).value.getBytes());
+      }
+    } else {
+      switch (functionName) {
+      case "booleanAnd":
+      case "booleanOr":
+        HBaseScanSpec leftScanSpec = args.get(0).accept(this, null);
+        HBaseScanSpec rightScanSpec = args.get(1).accept(this, null);
+        if (leftScanSpec != null && rightScanSpec != null) {
+          nodeScanSpec = mergeScanSpecs(functionName, leftScanSpec, rightScanSpec);
+        } else {
+          allExpressionsConverted = false;
+          if ("booleanAnd".equals(functionName)) {
+            nodeScanSpec = leftScanSpec == null ? rightScanSpec : leftScanSpec;
+          }
+        }
+        break;
+      case "isnotnull":
+      case "isNotNull":
+      case "is not null":
+      case "isnull":
+      case "isNull":
+      case "is null":
+        /*
+         * HBASE-10848: Bug in HBase versions (0.94.[0-18], 0.96.[0-2], 0.98.[0-1])
+         * causes a filter with NullComparator to fail. Enable only if specified in
+         * the configuration (after ensuring that the HBase cluster has the fix).
+         */
+        if (groupScan.getHBaseConf().getBoolean("drill.hbase.supports.null.comparator", false)) {
+          if (args.get(0) instanceof SchemaPath) {
+            nodeScanSpec = createHBaseScanSpec(functionName, ((SchemaPath) args.get(0)), null);
+          }
+        }
       }
     }
-    return null;
+
+    if (nodeScanSpec == null) {
+      allExpressionsConverted = false;
+    }
+    return nodeScanSpec;
+  }
+
+  private HBaseScanSpec mergeScanSpecs(String functionName, HBaseScanSpec leftScanSpec, HBaseScanSpec rightScanSpec) {
+    Filter newFilter = null;
+    byte[] startRow = HConstants.EMPTY_START_ROW;
+    byte[] stopRow = HConstants.EMPTY_END_ROW;
+
+    switch (functionName) {
+    case "booleanAnd":
+      newFilter = HBaseUtils.andFilterAtIndex(leftScanSpec.filter, HBaseUtils.LAST_FILTER, rightScanSpec.filter);
+      startRow = HBaseUtils.maxOfStartRows(leftScanSpec.startRow, rightScanSpec.startRow);
+      stopRow = HBaseUtils.minOfStopRows(leftScanSpec.stopRow, rightScanSpec.stopRow);
+      break;
+    case "booleanOr":
+      newFilter = HBaseUtils.orFilterAtIndex(leftScanSpec.filter, HBaseUtils.LAST_FILTER, rightScanSpec.filter);
+      startRow = HBaseUtils.minOfStartRows(leftScanSpec.startRow, rightScanSpec.startRow);
+      stopRow = HBaseUtils.maxOfStopRows(leftScanSpec.stopRow, rightScanSpec.stopRow);
+    }
+    return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, newFilter);
   }
 
-  private HBaseScanSpec createHBaseScanSpec(String functionName, byte[] value) {
-    byte[] startRow = scanSpec.getStartRow();
-    byte[] stopRow = scanSpec.getStopRow();
-    Filter filter = null;
+  private HBaseScanSpec createHBaseScanSpec(String functionName, SchemaPath field, byte[] fieldValue) {
+    boolean isRowKey = field.getAsUnescapedPath().equals(ROW_KEY);
+    if (!(isRowKey
+        || (field.getRootSegment().getChild() != null && field.getRootSegment().getChild().isNamed()))) {
+      /*
+       * if the field in this function is neither the row_key nor a qualified HBase column, return.
+       */
+      return null;
+    }
+
+    CompareOp compareOp = null;
+    boolean isNullTest = false;
+    WritableByteArrayComparable comparator = new BinaryComparator(fieldValue);
+    byte[] startRow = HConstants.EMPTY_START_ROW;
+    byte[] stopRow = HConstants.EMPTY_END_ROW;
     switch (functionName) {
     case "equal":
-      startRow = stopRow = value;
+      compareOp = CompareOp.EQUAL;
+      if (isRowKey) {
+        startRow = stopRow = fieldValue;
+      }
       break;
     case "not_equal":
-      filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(value));
+      compareOp = CompareOp.NOT_EQUAL;
       break;
     case "greater_than_or_equal_to":
-      startRow = value;
-      filter = new RowFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(value));
+      compareOp = CompareOp.GREATER_OR_EQUAL;
+      if (isRowKey) {
+        startRow = fieldValue;
+      }
       break;
     case "greater_than":
-      startRow = value;
-      filter = new RowFilter(CompareOp.GREATER, new BinaryComparator(value));
+      compareOp = CompareOp.GREATER;
+      if (isRowKey) {
+        startRow = fieldValue;
+      }
       break;
     case "less_than_or_equal_to":
-      stopRow = Arrays.copyOf(value, value.length+1); // stopRow should be just greater than 'value'
-      filter = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(value));
+      compareOp = CompareOp.LESS_OR_EQUAL;
+      if (isRowKey) {
+        // stopRow should be just greater than 'value'
+        stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+      }
       break;
     case "less_than":
-      stopRow = value;
-      filter = new RowFilter(CompareOp.LESS, new BinaryComparator(value));
+      compareOp = CompareOp.LESS;
+      if (isRowKey) {
+        stopRow = fieldValue;
+      }
+      break;
+    case "isnull":
+    case "isNull":
+    case "is null":
+      if (isRowKey) {
+        return null;
+      }
+      isNullTest = true;
+      compareOp = CompareOp.EQUAL;
+      comparator = new NullComparator();
       break;
-    default:
+    case "isnotnull":
+    case "isNotNull":
+    case "is not null":
+      if (isRowKey) {
+        return null;
+      }
+      compareOp = CompareOp.NOT_EQUAL;
+      comparator = new NullComparator();
       break;
     }
-    if (filter != null || startRow != scanSpec.getStartRow() || stopRow != scanSpec.getStopRow()) {
-      return new HBaseScanSpec(scanSpec.getTableName(), startRow, stopRow, filter);
+
+    if (compareOp != null || startRow != HConstants.EMPTY_START_ROW || stopRow != HConstants.EMPTY_END_ROW) {
+      Filter filter = null;
+      if (isRowKey) {
+        if (compareOp != null) {
+          filter = new RowFilter(compareOp, comparator);
+        }
+      } else {
+        byte[] family = HBaseUtils.getBytes(field.getRootSegment().getPath());
+        byte[] qualifier = HBaseUtils.getBytes(field.getRootSegment().getChild().getNameSegment().getPath());
+        filter = new SingleColumnValueFilter(family, qualifier, compareOp, comparator);
+        ((SingleColumnValueFilter)filter).setLatestVersionOnly(true);
+        if (!isNullTest) {
+          ((SingleColumnValueFilter)filter).setFilterIfMissing(true);
+        }
+      }
+      return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter);
     }
+    // else
     return null;
   }
 
-  @Override
-  public HBaseScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
-    return null;
+  private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+  static {
+   Builder<String, String> builder = ImmutableMap.builder();
+   COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+       .put("equal", "equal")
+       .put("not_equal", "not_equal")
+       .put("greater_than_or_equal_to", "less_than_or_equal_to")
+       .put("greater_than", "less_than")
+       .put("less_than_or_equal_to", "greater_than_or_equal_to")
+       .put("less_than", "greater_than")
+       .build();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index de60741..809aa86 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ServerName;
@@ -74,6 +75,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
   private NavigableMap<HRegionInfo,ServerName> regionsToScan;
   private HTableDescriptor hTableDesc;
 
+  private boolean filterPushedDown = false;
+
   @JsonCreator
   public HBaseGroupScan(@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
                         @JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
@@ -258,6 +261,16 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
     return storagePlugin;
   }
 
+  @JsonIgnore
+  public Configuration getHBaseConf() {
+    return getStorageConfig().getHBaseConf();
+  }
+
+  @JsonIgnore
+  public String getTableName() {
+    return getHBaseScanSpec().getTableName();
+  }
+
   @Override
   public String getDigest() {
     return toString();
@@ -290,4 +303,14 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
     return true;
   }
 
+  @JsonIgnore
+  public void setFilterPushedDown(boolean b) {
+    this.filterPushedDown = true;
+  }
+
+  @JsonIgnore
+  public boolean isFilterPushedDown() {
+    return filterPushedDown;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
index 50b2813..2b419d4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
@@ -19,16 +19,18 @@
 package org.apache.drill.exec.store.hbase;
 
 import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.FilterPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.rex.RexNode;
 
+import com.google.common.collect.ImmutableList;
+
 public class HBasePushFilterIntoScan extends StoragePluginOptimizerRule {
   public static final StoragePluginOptimizerRule INSTANCE = new HBasePushFilterIntoScan();
 
@@ -43,14 +45,36 @@ public class HBasePushFilterIntoScan extends StoragePluginOptimizerRule {
     final RexNode condition = filter.getCondition();
 
     HBaseGroupScan groupScan = (HBaseGroupScan)scan.getGroupScan();
+    if (groupScan.isFilterPushedDown()) {
+      /*
+       * The rule can get triggered again due to the transformed "scan => filter" sequence
+       * created by the earlier execution of this rule when we could not do a complete
+       * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
+       * this flag to not do a re-processing of the rule on the already transformed call.
+       */
+      return;
+    }
+
     LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(), scan, condition);
-    HBaseScanSpec newScanSpec = HBaseFilterBuilder.getHBaseScanSpec(groupScan.getHBaseScanSpec(), conditionExp);
+    HBaseFilterBuilder hbaseFilterBuilder = new HBaseFilterBuilder(groupScan, conditionExp);
+    HBaseScanSpec newScanSpec = hbaseFilterBuilder.parseTree();
     if (newScanSpec == null) {
-      return; //no filter pushdown ==> No transformation. 
+      return; //no filter pushdown ==> No transformation.
     }
-    final GroupScan newGroupsScan = new HBaseGroupScan(groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns());
+
+    final HBaseGroupScan newGroupsScan = new HBaseGroupScan(groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns());
+    newGroupsScan.setFilterPushedDown(true);
+
     final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
-    call.transformTo(newScanPrel);
+    if (hbaseFilterBuilder.isAllExpressionsConverted()) {
+      /*
+       * Since we could convert the entire filter condition expression into an HBase filter,
+       * we can eliminate the filter operator altogether.
+       */
+      call.transformTo(newScanPrel);
+    } else {
+      call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of((RelNode)newScanPrel)));
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 381cd6a..58285db 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -47,9 +47,6 @@ import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 
 import com.google.common.base.Stopwatch;
@@ -107,19 +104,17 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
     }
 
     try {
-      Filter scanFilter = subScanSpec.getScanFilter();
+      scan.setFilter(subScanSpec.getScanFilter());
       if (rowKeyOnly) {
         /* if only the row key was requested, add a FirstKeyOnlyFilter to the scan
          * to fetch only one KV from each row. If a filter is already part of this
-         * scan, add the FirstKeyOnlyFilter as the SECOND filter of a MUST_PASS_ALL
+         * scan, add the FirstKeyOnlyFilter as the LAST filter of a MUST_PASS_ALL
          * FilterList.
          */
-        Filter firstKeyFilter = new FirstKeyOnlyFilter();
-        scanFilter = (scanFilter == null)
-            ? firstKeyFilter
-            : new FilterList(Operator.MUST_PASS_ALL, scanFilter, firstKeyFilter);
+        scan.setFilter(
+            HBaseUtils.andFilterAtIndex(scan.getFilter(), HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter())
+        );
       }
-      scan.setFilter(scanFilter);
       scan.setCaching(TARGET_RECORD_COUNT);
 
       logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.",

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java
index bebdda7..10a7e64 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseUtils.java
@@ -23,17 +23,24 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.charset.CharacterCodingException;
+import java.util.List;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.ParseFilter;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import com.google.common.collect.Lists;
+
 public class HBaseUtils {
   static final ParseFilter FILTER_PARSEER = new ParseFilter();
 
+  static final int FIRST_FILTER = 0;
+  static final int LAST_FILTER = -1;
+
   public static byte[] getBytes(String str) {
     return str == null ? HConstants.EMPTY_BYTE_ARRAY : Bytes.toBytes(str);
   }
@@ -41,7 +48,7 @@ public class HBaseUtils {
   static Filter parseFilterString(String filterString) {
     if (filterString == null) return null;
     try {
-        return FILTER_PARSEER.parseFilterString(filterString);
+      return FILTER_PARSEER.parseFilterString(filterString);
     } catch (CharacterCodingException e) {
       throw new DrillRuntimeException("Error parsing filter string: " + filterString, e);
     }
@@ -66,4 +73,66 @@ public class HBaseUtils {
     }
   }
 
+  public static Filter andFilterAtIndex(Filter currentFilter, int index, Filter newFilter) {
+    if (currentFilter == null) {
+      return newFilter;
+    } else if (newFilter == null) {
+      return currentFilter;
+    }
+
+    List<Filter> allFilters = Lists.newArrayList();
+    if (currentFilter instanceof FilterList && ((FilterList)currentFilter).getOperator() == FilterList.Operator.MUST_PASS_ALL) {
+      allFilters.addAll(((FilterList)currentFilter).getFilters());
+    } else {
+      allFilters.add(currentFilter);
+    }
+    allFilters.add((index == LAST_FILTER ? allFilters.size() : index), newFilter);
+    return new FilterList(FilterList.Operator.MUST_PASS_ALL, allFilters);
+  }
+
+  public static Filter orFilterAtIndex(Filter currentFilter, int index, Filter newFilter) {
+    if (currentFilter == null) {
+      return newFilter;
+    } else if (newFilter == null) {
+      return currentFilter;
+    }
+
+    List<Filter> allFilters = Lists.newArrayList();
+    if (currentFilter instanceof FilterList && ((FilterList)currentFilter).getOperator() == FilterList.Operator.MUST_PASS_ONE) {
+      allFilters.addAll(((FilterList)currentFilter).getFilters());
+    } else {
+      allFilters.add(currentFilter);
+    }
+    allFilters.add((index == LAST_FILTER ? allFilters.size() : index), newFilter);
+    return new FilterList(FilterList.Operator.MUST_PASS_ONE, allFilters);
+  }
+
+  public static byte[] maxOfStartRows(byte[] left, byte[] right) {
+    if (left == null || left.length == 0 || right == null || right.length == 0) {
+      return (left == null || left.length == 0) ? right : left;
+    }
+    return Bytes.compareTo(left, right) > 0 ? left : right;
+  }
+
+  public static byte[] minOfStartRows(byte[] left, byte[] right) {
+    if (left == null || left.length == 0 || right == null || right.length == 0) {
+      return HConstants.EMPTY_BYTE_ARRAY;
+    }
+    return Bytes.compareTo(left, right) < 0 ? left : right;
+  }
+
+  public static byte[] maxOfStopRows(byte[] left, byte[] right) {
+    if (left == null || left.length == 0 || right == null || right.length == 0) {
+      return HConstants.EMPTY_BYTE_ARRAY;
+    }
+    return Bytes.compareTo(left, right) > 0 ? left : right;
+  }
+
+  public static byte[] minOfStopRows(byte[] left, byte[] right) {
+    if (left == null || left.length == 0 || right == null || right.length == 0) {
+      return (left == null || left.length == 0) ? right : left;
+    }
+    return Bytes.compareTo(left, right) < 0 ? left : right;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index a68cf70..f170025 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -87,7 +87,8 @@ public class BaseHBaseTest extends BaseTestQuery {
   }
 
   protected void runSQLVerifyCount(String sql, int expectedRowCount) throws Exception{
-    sql = sql.replace("[TABLE_NAME]", HBaseTestsSuite.TEST_TABLE_1);
+    sql = canonizeSQL(sql);
+    System.out.println("Running query:\n" + sql);
     List<QueryResultBatch> results = testSqlWithResults(sql);
     printResultAndVerifyRowCount(results, expectedRowCount);
   }
@@ -106,7 +107,13 @@ public class BaseHBaseTest extends BaseTestQuery {
       result.release();
     }
     System.out.println("Total record count: " + rowCount);
-    Assert.assertEquals(expectedRowCount, rowCount);
+    if (expectedRowCount != -1) {
+      Assert.assertEquals(expectedRowCount, rowCount);
+    }
+  }
+
+  protected String canonizeSQL(String sql) {
+    return sql.replace("[TABLE_NAME]", HBaseTestsSuite.TEST_TABLE_1);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index 32d7aaa..0e9d1cf 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -124,7 +124,12 @@ public class HBaseTestsSuite {
   }
 
   private static void createTestTables() throws Exception {
-    TestTableGenerator.generateHBaseDataset1(admin, TEST_TABLE_1, 2);
+    /*
+     * We are seeing some issues with (Drill) Filter operator if a group scan span
+     * multiple fragments. Hence the number of regions in the HBase table is set to 1.
+     * Will revert to multiple region once the issue is resolved.
+     */
+    TestTableGenerator.generateHBaseDataset1(admin, TEST_TABLE_1, 1);
   }
 
   private static void cleanupTestTables() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 6e64c4a..6dd418c 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -17,37 +17,64 @@
  */
 package org.apache.drill.hbase;
 
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestHBaseFilterPushDown extends BaseHBaseTest {
 
   @Test
-  public void testFilterPushDownRowKeyEqual() throws Exception{
+  public void testFilterPushDownRowKeyEqual() throws Exception {
     runSQLVerifyCount("SELECT\n"
-        + "  tableName.*\n"
+        + "  *\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName\n"
-        + "  WHERE tableName.row_key = 'b4'"
+        + "WHERE\n"
+        + "  row_key = 'b4'"
         , 1);
   }
 
   @Test
-  public void testFilterPushDownRowKeyGreaterThan() throws Exception{
+  public void testFilterPushDownRowKeyGreaterThan() throws Exception {
     runSQLVerifyCount("SELECT\n"
-        + "  tableName.*\n"
+        + "  *\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName\n"
-        + "  WHERE tableName.row_key > 'b4'"
+        + "WHERE\n"
+        + "  row_key > 'b4'"
         , 2);
   }
 
   @Test
-  public void testFilterPushDownRowKeyLessThanOrEqualTo() throws Exception{
+  public void testFilterPushDownMultiColumns() throws Exception {
     runSQLVerifyCount("SELECT\n"
-        + "  tableName.*\n"
+        + "  *\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName\n"
-        + "  WHERE 'b4' >= tableName.row_key"
+        + "WHERE\n"
+        + "  (row_key >= 'b5' OR row_key <= 'a2') AND (f['c1'] >= '1' OR f['c1'] is null)"
+        , 4);
+  }
+
+  @Test
+  @Ignore("Until convert_from() functions are working.")
+  public void testFilterPushDownConvertExpression() throws Exception {
+    runSQLVerifyCount("SELECT\n"
+        + "  *\n"
+        + "FROM\n"
+        + "  hbase.`[TABLE_NAME]` tableName\n"
+        + "WHERE\n"
+        + "  convert_from(row_key, 'INT_BE') > 12"
+        , -1);
+  }
+
+  @Test
+  public void testFilterPushDownRowKeyLessThanOrEqualTo() throws Exception {
+    runSQLVerifyCount("SELECT\n"
+        + "  *\n"
+        + "FROM\n"
+        + "  hbase.`[TABLE_NAME]` tableName\n"
+        + "WHERE\n"
+        + "  'b4' >= row_key"
         , 4);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
index 0d91454..88194d5 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
@@ -43,7 +43,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
   public void testRowKeyAndColumnPushDown() throws Exception{
     setColumnWidth(9);
     runSQLVerifyCount("SELECT\n"
-        + "row_key, f['c1']*31 as `f[c1]*31`, f['c2'] as `f['c2']`, 5 as `5`, 'abc' as `'abc'`\n"
+        + "row_key, f['c1']*31 as `f['c1']*31`, f['c2'] as `f['c2']`, 5 as `5`, 'abc' as `'abc'`\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName"
         , 6);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fafb5761/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
index 487c306..3678c78 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
@@ -42,7 +42,11 @@ public class TestTableGenerator {
     HTableDescriptor desc = new HTableDescriptor(tableName);
     desc.addFamily(new HColumnDescriptor("f"));
     desc.addFamily(new HColumnDescriptor("f2"));
-    admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1));
+    if (numberRegions > 1) {
+      admin.createTable(desc, Arrays.copyOfRange(SPLIT_KEYS, 0, numberRegions-1));
+    } else {
+      admin.createTable(desc);
+    }
 
     HTable table = new HTable(admin.getConfiguration(), tableName);
 


[07/11] git commit: DRILL-567: Maven RAT plugin - Ignore all files inside a hidden directory ".*/**"

Posted by ja...@apache.org.
DRILL-567: Maven RAT plugin - Ignore all files inside a hidden directory ".*/**"


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8a290ea6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8a290ea6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8a290ea6

Branch: refs/heads/master
Commit: 8a290ea6fe873db446970293679ed0d81ee55911
Parents: fb973c6
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Thu Apr 24 14:32:54 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:48:17 2014 -0700

----------------------------------------------------------------------
 pom.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8a290ea6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d135c56..f567b64 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,6 +136,7 @@
             <exclude>**/*.iml</exclude>
             <exclude>**/*.tdd</exclude>
             <exclude>**/*.project</exclude>
+            <exclude>.*/**</exclude>
           </excludes>
         </configuration>
       </plugin>


[08/11] git commit: DRILL-691: Function alias to be able to cast to TIMESTAMPTZ data type

Posted by ja...@apache.org.
DRILL-691: Function alias to be able to cast to TIMESTAMPTZ data type


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/a50ab2b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/a50ab2b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/a50ab2b9

Branch: refs/heads/master
Commit: a50ab2b9bf2131ba44dd566343a1cac0d751319d
Parents: 8a290ea
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 11 18:18:48 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 12:13:06 2014 -0700

----------------------------------------------------------------------
 exec/java-exec/src/main/codegen/data/Casts.tdd               | 8 ++++----
 .../src/main/codegen/templates/CastVarCharDate.java          | 2 +-
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a50ab2b9/exec/java-exec/src/main/codegen/data/Casts.tdd
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/data/Casts.tdd b/exec/java-exec/src/main/codegen/data/Casts.tdd
index 0dc0090..73cdede 100644
--- a/exec/java-exec/src/main/codegen/data/Casts.tdd
+++ b/exec/java-exec/src/main/codegen/data/Casts.tdd
@@ -57,10 +57,10 @@
     {from: "TimeStampTZ", to: "Date", major: "Date"},
     {from: "TimeStampTZ", to: "TimeStamp", major: "Date"},
 
-    {from: "VarChar", to: "Date", major: "VarCharDate"},
-    {from: "VarChar", to: "TimeStamp", major: "VarCharDate"},
-    {from: "VarChar", to: "TimeStampTZ", major: "VarCharDate"},
-    {from: "VarChar", to: "Time", major: "VarCharDate"},
+    {from: "VarChar", to: "Date", major: "VarCharDate", alias: "datetype"},
+    {from: "VarChar", to: "TimeStamp", major: "VarCharDate", alias: "timestamptype"},
+    {from: "VarChar", to: "TimeStampTZ", major: "VarCharDate", alias: "timestamptztype"},
+    {from: "VarChar", to: "Time", major: "VarCharDate", alias: "timetype"},
 
     {from: "Date", to: "VarChar", major: "DateVarChar", bufferLength: "10"}
     {from: "TimeStamp", to: "VarChar", major: "DateVarChar", bufferLength: "23"},

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a50ab2b9/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java b/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
index 249b555..5a3127a 100644
--- a/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
+++ b/exec/java-exec/src/main/codegen/templates/CastVarCharDate.java
@@ -41,7 +41,7 @@ import org.joda.time.DateMidnight;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
 
 @SuppressWarnings("unused")
-@FunctionTemplate(name = "cast${type.to?upper_case}", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.NULL_IF_NULL)
+@FunctionTemplate(names = {"cast${type.to?upper_case}", "${type.alias}"}, scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.NULL_IF_NULL)
 public class Cast${type.from}To${type.to} implements DrillSimpleFunc {
 
   @Param ${type.from}Holder in;


[03/11] git commit: DRILL-686: Verify the operands while extracting projects that can be pushed into a scan

Posted by ja...@apache.org.
DRILL-686: Verify the operands while extracting projects that can be pushed into a scan


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/508cb5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/508cb5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/508cb5de

Branch: refs/heads/master
Commit: 508cb5dea9deb66661267abd174d54db99dc6bfa
Parents: 49d5333
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sat May 10 18:07:10 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:46:19 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/planner/physical/PrelUtil.java     | 18 +++++++++++-------
 1 file changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/508cb5de/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index e98b970..bdc8b31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -41,7 +41,6 @@ import org.eigenbase.rex.RexCall;
 import org.eigenbase.rex.RexInputRef;
 import org.eigenbase.rex.RexLiteral;
 import org.eigenbase.rex.RexNode;
-import org.eigenbase.rex.RexOver;
 import org.eigenbase.rex.RexVisitorImpl;
 
 import com.beust.jcommander.internal.Lists;
@@ -144,12 +143,17 @@ public class PrelUtil {
     @Override
     public PathSegment visitCall(RexCall call) {
       if ("ITEM".equals(call.getOperator().getName())) {
-        return call.operands.get(0).accept(this)
-            .cloneWithNewChild(convertLiteral((RexLiteral) call.operands.get(1)));
-      }
-      // else
-      for (RexNode operand : call.operands) {
-        addColumn(operand.accept(this));
+        PathSegment mapOrArray = call.operands.get(0).accept(this);
+        if (mapOrArray != null) {
+          if (call.operands.get(1) instanceof RexLiteral) {
+            return mapOrArray.cloneWithNewChild(convertLiteral((RexLiteral) call.operands.get(1)));
+          }
+          return mapOrArray;
+        }
+      } else {
+        for (RexNode operand : call.operands) {
+          addColumn(operand.accept(this));
+        }
       }
       return null;
     }


[06/11] git commit: DRILL-529: 'atom' rule in Antlr grammar uses wrong token for calculating ExpressionPosition

Posted by ja...@apache.org.
DRILL-529: 'atom' rule in Antlr grammar uses wrong token for calculating ExpressionPosition


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fb973c66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fb973c66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fb973c66

Branch: refs/heads/master
Commit: fb973c66eb538ba5e780fe1f6369f13fa242f1de
Parents: fafb576
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Tue Apr 15 16:59:47 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:47:57 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/expression/parser/ExprParser.g     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fb973c66/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
----------------------------------------------------------------------
diff --git a/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g b/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
index 5ad7099..9737e5d 100644
--- a/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
+++ b/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
@@ -278,13 +278,13 @@ xorExpr returns [LogicalExpression e]
   
 unaryExpr returns [LogicalExpression e]
   :  sign=(Plus|Minus)? Number {$e = ValueExpressions.getNumericExpression($sign.text, $Number.text, pos(($sign != null) ? $sign : $Number)); }
-  |  Minus atom {$e = FunctionCallFactory.createExpression("u-", pos($atom.start), $atom.e); }
-  |  Excl atom {$e= FunctionCallFactory.createExpression("!", pos($atom.start), $atom.e); }
+  |  Minus atom {$e = FunctionCallFactory.createExpression("u-", pos($Minus), $atom.e); }
+  |  Excl atom {$e= FunctionCallFactory.createExpression("!", pos($Excl), $atom.e); }
   |  atom {$e = $atom.e; }
   ;
 
 atom returns [LogicalExpression e]
-  :  Bool {$e = new ValueExpressions.BooleanExpression($Bool.text, pos($atom.start)); }
+  :  Bool {$e = new ValueExpressions.BooleanExpression($Bool.text, pos($Bool)); }
   |  lookup {$e = $lookup.e; }
   ;
 


[04/11] git commit: DRILL-696: Use standard HBase configuration name/value in HBaseStoragePluginConfig

Posted by ja...@apache.org.
DRILL-696: Use standard HBase configuration name/value in HBaseStoragePluginConfig


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6e6c661f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6e6c661f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6e6c661f

Branch: refs/heads/master
Commit: 6e6c661fd8099fb7f385346ba38e2026c84fc67a
Parents: 508cb5d
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Sun May 11 03:19:43 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 11:46:38 2014 -0700

----------------------------------------------------------------------
 .../store/hbase/HBaseStoragePluginConfig.java   | 52 +++++++++++---------
 .../org/apache/drill/hbase/BaseHBaseTest.java   |  2 +-
 .../org/apache/drill/hbase/HBaseTestsSuite.java | 13 +++--
 .../hbase/hbase_scan_screen_physical.json       |  6 ++-
 ...base_scan_screen_physical_column_select.json |  6 ++-
 ...base_scan_screen_physical_family_select.json |  6 ++-
 .../src/test/resources/storage-plugins.json     | 39 ++-------------
 distribution/src/resources/storage-plugins.json |  8 +--
 8 files changed, 58 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
index 5a434d6..054d65f 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePluginConfig.java
@@ -17,45 +17,43 @@
  */
 package org.apache.drill.exec.store.hbase;
 
+import java.util.Map;
+
 import org.apache.drill.common.logical.StoragePluginConfigBase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 
 @JsonTypeName("hbase")
 public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements DrillHBaseConstants {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseStoragePluginConfig.class);
 
-  @JsonProperty
-  public String zookeeperQuorum;
-
-  @JsonProperty
-  public int zookeeperPort;
+  private Map<String, String> config;
 
+  @JsonIgnore
   private Configuration hbaseConf;
-  private HConnectionKey hbaseConfKey;
 
   @JsonCreator
-  public HBaseStoragePluginConfig(@JsonProperty("zookeeperQuorum") String zookeeperQuorum,
-                                  @JsonProperty("zookeeperPort") int zookeeperPort) {
-    this.zookeeperQuorum = zookeeperQuorum;
-    this.zookeeperPort = zookeeperPort;
-
-    this.hbaseConf = HBaseConfiguration.create();
-    logger.debug("Configuring HBase StoragePlugin with zookeeper quorum '{}', port '{}' node '{}'.",
-        zookeeperQuorum, zookeeperPort, hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
-    if (zookeeperQuorum != null && zookeeperQuorum.length() != 0) {
-      hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum);
-      hbaseConf.setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
+  public HBaseStoragePluginConfig(@JsonProperty("config") Map<String, String> props) {
+    this.config = props;
+    if (config == null) {
+      config = Maps.newHashMap();
     }
-    this.hbaseConfKey = new HConnectionKey(hbaseConf);
+    logger.debug("Configuring HBase StoragePlugin with zookeeper quorum '{}', port '{}'.",
+        config.get(HConstants.ZOOKEEPER_QUORUM), config.get(HBASE_ZOOKEEPER_PORT));
+  }
+
+  @JsonProperty
+  public Map<String, String> getConfig() {
+    return ImmutableMap.copyOf(config);
   }
 
   @Override
@@ -66,24 +64,32 @@ public class HBaseStoragePluginConfig extends StoragePluginConfigBase implements
       return false;
     }
     HBaseStoragePluginConfig that = (HBaseStoragePluginConfig) o;
-    return this.hbaseConfKey.equals(that.hbaseConfKey);
+    return config.equals(that.config);
   }
 
   @Override
   public int hashCode() {
-    return this.hbaseConfKey != null ? this.hbaseConfKey.hashCode() : 0;
+    return this.config != null ? this.config.hashCode() : 0;
   }
 
   @JsonIgnore
   public Configuration getHBaseConf() {
+    if (hbaseConf == null) {
+      hbaseConf = HBaseConfiguration.create();
+      if (config != null) {
+        for (Map.Entry<String, String> entry : config.entrySet()) {
+          hbaseConf.set(entry.getKey(), entry.getValue());
+        }
+      }
+    }
     return hbaseConf;
   }
 
   @JsonIgnore
   @VisibleForTesting
   public void setZookeeperPort(int zookeeperPort) {
-    this.zookeeperPort = zookeeperPort;
-    hbaseConf.setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
+    this.config.put(HBASE_ZOOKEEPER_PORT, String.valueOf(zookeeperPort));
+    getHBaseConf().setInt(HBASE_ZOOKEEPER_PORT, zookeeperPort);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index 0753a4d..a68cf70 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -76,7 +76,7 @@ public class BaseHBaseTest extends BaseTestQuery {
 
   protected String getPlanText(String planFile, String tableName) throws IOException {
     return Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8)
-        .replaceFirst("\"zookeeperPort\".*:.*\\d+", "\"zookeeperPort\" : " + HBaseTestsSuite.getZookeeperPort())
+        .replaceFirst("\"hbase\\.zookeeper\\.property\\.clientPort\".*:.*\\d+", "\"hbase.zookeeper.property.clientPort\" : " + HBaseTestsSuite.getZookeeperPort())
         .replace("[TABLE_NAME]", tableName);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index 36c31b7..32d7aaa 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -40,7 +38,8 @@ import org.junit.runners.Suite.SuiteClasses;
   TestHBaseFilterPushDown.class,
   TestHBaseProjectPushDown.class})
 public class HBaseTestsSuite {
-  private static final Log LOG = LogFactory.getLog(HBaseTestsSuite.class);
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseTestsSuite.class);
+
   private static final boolean IS_DEBUG = ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0;
 
   protected static final String TEST_TABLE_1 = "TestTable1";
@@ -70,11 +69,11 @@ public class HBaseTestsSuite {
           }
 
           if (manageHBaseCluster) {
-            LOG.info("Starting HBase mini cluster.");
+            logger.info("Starting HBase mini cluster.");
             UTIL = new HBaseTestingUtility(conf);
             UTIL.startMiniCluster();
             hbaseClusterCreated = true;
-            LOG.info("HBase mini cluster started.");
+            logger.info("HBase mini cluster started. Zookeeper port: '{}'", getZookeeperPort());
           }
 
           admin = new HBaseAdmin(conf);
@@ -104,9 +103,9 @@ public class HBaseTestsSuite {
         }
 
         if (hbaseClusterCreated) {
-          LOG.info("Shutting down HBase mini cluster.");
+          logger.info("Shutting down HBase mini cluster.");
           UTIL.shutdownMiniCluster();
-          LOG.info("HBase mini cluster stopped.");
+          logger.info("HBase mini cluster stopped.");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json
index 17e17e4..7f9015b 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical.json
@@ -15,8 +15,10 @@
     storage:
     {
       "type":"hbase",
-      "zookeeperQuorum" : "localhost",
-      "zookeeperPort" : 2181
+      config : {
+        "hbase.zookeeper.quorum" : "localhost",
+        "hbase.zookeeper.property.clientPort" : 2181
+      }
     }
   },
   {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
index dc08031..f399f6f 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
@@ -15,8 +15,10 @@
     storage:
     {
       "type":"hbase",
-      "zookeeperQuorum" : "localhost",
-      "zookeeperPort" : 2181
+      config : {
+        "hbase.zookeeper.quorum" : "localhost",
+        "hbase.zookeeper.property.clientPort" : 2181
+      }
     },
     columns: [
       "`f2`.c1", "`f2`.c2", "row_key"

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json
index ce027a0..0002164 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_family_select.json
@@ -15,8 +15,10 @@
     storage:
     {
       "type":"hbase",
-      "zookeeperQuorum" : "localhost",
-      "zookeeperPort" : 2181
+      config : {
+        "hbase.zookeeper.quorum" : "localhost",
+        "hbase.zookeeper.property.clientPort" : 2181
+      }
     },
     columns: [
       "f2"

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/contrib/storage-hbase/src/test/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/storage-plugins.json b/contrib/storage-hbase/src/test/resources/storage-plugins.json
index 160583a..0e93f7e 100644
--- a/contrib/storage-hbase/src/test/resources/storage-plugins.json
+++ b/contrib/storage-hbase/src/test/resources/storage-plugins.json
@@ -1,40 +1,11 @@
 {
   "storage":{
-    dfs: {
-      type: "file",
-      connection: "file:///",
-      formats: {
-        "psv" : {
-          type: "text",
-          extensions: [ "tbl" ],
-          delimiter: "|"
-        },
-        "csv" : {
-          type: "text",
-          extensions: [ "csv" ],
-          delimiter: ","
-        },
-        "tsv" : {
-          type: "text",
-          extensions: [ "tsv" ],
-          delimiter: "\t"
-        },
-        "parquet" : {
-          type: "parquet"
-        },
-        "json" : {
-          type: "json"
-        }
-      }
-    },
-    cp: {
-      type: "file",
-      connection: "classpath:///"
-    },
     hbase : {
       type:"hbase",
-      zookeeperQuorum : "localhost",
-      zookeeperPort : 2181
+      config : {
+        "hbase.zookeeper.quorum" : "localhost",
+        "hbase.zookeeper.property.clientPort" : 2181
+      }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6e6c661f/distribution/src/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/distribution/src/resources/storage-plugins.json b/distribution/src/resources/storage-plugins.json
index c3f4a4b..3b1cbd0 100644
--- a/distribution/src/resources/storage-plugins.json
+++ b/distribution/src/resources/storage-plugins.json
@@ -59,9 +59,11 @@
 
     /*,
     hbase : {
-        type:"hbase",
-        zookeeperQuorum : "localhost",
-        zookeeperPort : 2181
+      type:"hbase",
+      config : {
+        "hbase.zookeeper.quorum" : "localhost",
+        "hbase.zookeeper.property.clientPort" : 2181
+      }
     }
     */
   }


[10/11] git commit: Disable multiphase aggregation.

Posted by ja...@apache.org.
Disable multiphase aggregation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f7b7b1a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f7b7b1a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f7b7b1a0

Branch: refs/heads/master
Commit: f7b7b1a0211bef4cf663546e1d32cfb2a384222f
Parents: e4d3c7c
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Mon May 12 17:54:01 2014 -0700
Committer: Aditya Kishore <ad...@maprtech.com>
Committed: Mon May 12 17:54:01 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/planner/physical/PlannerSettings.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f7b7b1a0/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 0f694af..5bead30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -34,7 +34,7 @@ public class PlannerSettings implements FrameworkContext{
   public static final OptionValidator STREAMAGG = new BooleanValidator("planner.enable_streamagg", true);  
   public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);  
   public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);  
-  public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);  
+  public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", false);  
   
   public OptionManager options = null;
 


[11/11] git commit: Enable class unloading in unit test.

Posted by ja...@apache.org.
Enable class unloading in unit test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1b20b6e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1b20b6e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1b20b6e9

Branch: refs/heads/master
Commit: 1b20b6e9e9f98dc7fea14e23f80f152b08971628
Parents: f7b7b1a
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Mon May 12 17:54:31 2014 -0700
Committer: Aditya Kishore <ad...@maprtech.com>
Committed: Mon May 12 17:54:31 2014 -0700

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1b20b6e9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f567b64..81054dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -259,7 +259,7 @@
           <artifactId>maven-surefire-plugin</artifactId>
           <version>2.17</version>
           <configuration>
-            <argLine>-Xms1g -Xmx2g -XX:MaxDirectMemorySize=13096M </argLine>
+            <argLine>-Xms1g -Xmx2g -XX:MaxDirectMemorySize=13096M -XX:+CMSClassUnloadingEnabled</argLine>
             <forkCount>1</forkCount>
             <reuseForks>true</reuseForks>
             <additionalClasspathElements>


[09/11] git commit: DRILL-690: Generate 2 phase plans for Hash Aggr and Streaming Aggr only if the aggr functions are SUM, MIN, MAX.

Posted by ja...@apache.org.
DRILL-690: Generate 2 phase plans for Hash Aggr and Streaming Aggr only if the aggr functions are SUM, MIN, MAX.

Added session options for enabling/disabling aggrs, joins and multiphase aggrs.  Modified DrillRuleSet to populate the rules based on options rather than from static list.  Added matches() implementation for join and aggr rules.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e4d3c7ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e4d3c7ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e4d3c7ca

Branch: refs/heads/master
Commit: e4d3c7ca9e2cc20defb5fae7c0373d982147cabb
Parents: a50ab2b
Author: Aman Sinha <as...@maprtech.com>
Authored: Sun May 11 17:57:09 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 12 12:19:22 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/logical/DrillRuleSets.java     | 55 ++++++++++++----
 .../exec/planner/physical/AggPruleBase.java     | 18 ++++++
 .../exec/planner/physical/HashAggPrule.java     | 56 +++++++++++++---
 .../exec/planner/physical/HashJoinPrule.java    |  5 ++
 .../exec/planner/physical/MergeJoinPrule.java   |  5 ++
 .../exec/planner/physical/PlannerSettings.java  | 29 ++++++++-
 .../drill/exec/planner/physical/PrelUtil.java   |  5 ++
 .../exec/planner/physical/StreamAggPrule.java   | 67 ++++++++++++--------
 .../drill/exec/planner/sql/DrillSqlWorker.java  | 12 ++--
 .../server/options/SystemOptionManager.java     |  7 +-
 10 files changed, 204 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 0d6da25..c07fee3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -17,22 +17,14 @@
  */
 package org.apache.drill.exec.planner.logical;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import net.hydromatic.optiq.tools.RuleSet;
 
+import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.planner.physical.*;
-import org.apache.drill.exec.planner.physical.FilterPrule;
-import org.apache.drill.exec.planner.physical.HashAggPrule;
-import org.apache.drill.exec.planner.physical.HashJoinPrule;
-import org.apache.drill.exec.planner.physical.LimitPrule;
-import org.apache.drill.exec.planner.physical.MergeJoinPrule;
-import org.apache.drill.exec.planner.physical.ProjectPrule;
-import org.apache.drill.exec.planner.physical.ScanPrule;
-import org.apache.drill.exec.planner.physical.ScreenPrule;
-import org.apache.drill.exec.planner.physical.SortConvertPrule;
-import org.apache.drill.exec.planner.physical.SortPrule;
-import org.apache.drill.exec.planner.physical.StreamAggPrule;
 import org.eigenbase.rel.RelFactories;
 import org.eigenbase.rel.rules.MergeProjectRule;
 import org.eigenbase.rel.rules.PushFilterPastJoinRule;
@@ -102,6 +94,7 @@ public class DrillRuleSets {
       MergeProjectRule.INSTANCE
       ));
 
+  /* 
   public static final RuleSet DRILL_PHYSICAL_MEM = new DrillRuleSet(ImmutableSet.of( //
 //      DrillScanRule.INSTANCE,
 //      DrillFilterRule.INSTANCE,
@@ -150,12 +143,50 @@ public class DrillRuleSets {
 //    PushJoinThroughJoinRule.LEFT, //
 //    PushSortPastProjectRule.INSTANCE, //
     ));
-
+*/
   public static final RuleSet DRILL_PHYSICAL_DISK = new DrillRuleSet(ImmutableSet.of( //
       ProjectPrule.INSTANCE
 
     ));
 
+  public static final RuleSet getPhysicalRules(QueryContext qcontext) {
+    List<RelOptRule> ruleList = new ArrayList<RelOptRule>(); 
+
+    
+    ruleList.add(ConvertCountToDirectScan.AGG_ON_PROJ_ON_SCAN);
+    ruleList.add(ConvertCountToDirectScan.AGG_ON_SCAN);
+    ruleList.add(SortConvertPrule.INSTANCE);
+    ruleList.add(SortPrule.INSTANCE);
+    ruleList.add(ProjectPrule.INSTANCE);
+    ruleList.add(ScanPrule.INSTANCE);
+    ruleList.add(ScreenPrule.INSTANCE);
+    ruleList.add(ExpandConversionRule.INSTANCE);
+    ruleList.add(FilterPrule.INSTANCE);
+    ruleList.add(LimitPrule.INSTANCE);
+    ruleList.add(WriterPrule.INSTANCE);
+    ruleList.add(PushLimitToTopN.INSTANCE);
+    
+    PlannerSettings ps = qcontext.getPlannerSettings();
+    
+    if (ps.isHashAggEnabled()) {
+      ruleList.add(HashAggPrule.INSTANCE);        
+    }
+    
+    if (ps.isStreamAggEnabled()) {
+      ruleList.add(StreamAggPrule.INSTANCE);        
+    }
+    
+    if (ps.isHashJoinEnabled()) {
+      ruleList.add(HashJoinPrule.INSTANCE);        
+    }
+    
+    if (ps.isMergeJoinEnabled()) {
+      ruleList.add(MergeJoinPrule.INSTANCE);        
+    }
+  
+    return new DrillRuleSet(ImmutableSet.copyOf(ruleList)); 
+  }
+
   public static RuleSet create(ImmutableSet<RelOptRule> rules) {
     return new DrillRuleSet(rules);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index 235018d..3bdcc2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -24,7 +24,9 @@ import net.hydromatic.optiq.util.BitSets;
 
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.eigenbase.rel.AggregateCall;
 import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelOptRuleOperand;
 
 import com.google.common.collect.Lists;
@@ -54,4 +56,20 @@ public abstract class AggPruleBase extends RelOptRule {
     return groupByFields;
   }
   
+  // Create 2 phase aggr plan for aggregates such as SUM, MIN, MAX
+  // If any of the aggregate functions are not one of these, then we 
+  // currently won't generate a 2 phase plan. 
+  protected boolean create2PhasePlan(RelOptRuleCall call, DrillAggregateRel aggregate) {
+    if (! PrelUtil.getPlannerSettings(call.getPlanner()).isMultiPhaseAggEnabled()) {
+      return false;
+    }
+    
+    for (AggregateCall aggCall : aggregate.getAggCallList()) {
+      String name = aggCall.getAggregation().getName();
+      if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX"))) {
+        return false;
+      }
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 22b33ea..859941b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -27,6 +27,7 @@ import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
 import org.eigenbase.trace.EigenbaseTrace;
 
 import com.google.common.collect.ImmutableList;
@@ -40,6 +41,11 @@ public class HashAggPrule extends AggPruleBase {
   }
 
   @Override
+  public boolean matches(RelOptRuleCall call) {
+    return PrelUtil.getPlannerSettings(call.getPlanner()).isHashAggEnabled();
+  }
+  
+  @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
     final RelNode input = call.rel(1);
@@ -50,30 +56,60 @@ public class HashAggPrule extends AggPruleBase {
       return;
     }
     
-    DrillDistributionTrait toDist = null;
     RelTraitSet traits = null;
 
     try {
       if (aggregate.getGroupSet().isEmpty()) {
-        toDist = DrillDistributionTrait.SINGLETON;
-        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+        DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
+        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
         createTransformRequest(call, aggregate, input, traits);
       } else {
         // hash distribute on all grouping keys
-        toDist = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, 
-                                            ImmutableList.copyOf(getDistributionField(aggregate, true /* get all grouping keys */)));
+        DrillDistributionTrait distOnAllKeys = 
+            new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, 
+                                       ImmutableList.copyOf(getDistributionField(aggregate, true /* get all grouping keys */)));
     
-        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys);
         createTransformRequest(call, aggregate, input, traits);
 
         // hash distribute on single grouping key
-        toDist = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, 
-                                            ImmutableList.copyOf(getDistributionField(aggregate, false /* get single grouping key */)));
+        DrillDistributionTrait distOnOneKey = 
+            new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, 
+                                       ImmutableList.copyOf(getDistributionField(aggregate, false /* get single grouping key */)));
     
-        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnOneKey);
         createTransformRequest(call, aggregate, input, traits);
         
-        ///TODO: 2 phase hash aggregate plan 
+        if (create2PhasePlan(call, aggregate)) {
+          traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
+
+          RelNode convertedInput = convert(input, traits);  
+
+          if (convertedInput instanceof RelSubset) {
+            RelSubset subset = (RelSubset) convertedInput;
+            for (RelNode rel : subset.getRelList()) {
+              if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
+                DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+                traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
+                RelNode newInput = convert(input, traits);
+
+                HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
+                    aggregate.getGroupSet(),
+                    aggregate.getAggCallList());
+
+                HashToRandomExchangePrel exch =
+                    new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+                        phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)));
+
+                HashAggPrel phase2Agg =  new HashAggPrel(aggregate.getCluster(), traits, exch,
+                                                         aggregate.getGroupSet(),
+                                                         aggregate.getAggCallList());
+
+                call.transformTo(phase2Agg);                   
+              }
+            }
+          }    
+        }
       } 
     } catch (InvalidRelException e) {
       tracer.warning(e.toString());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index 5e3ace2..851877f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -41,6 +41,11 @@ public class HashJoinPrule extends JoinPruleBase {
   }
 
   @Override
+  public boolean matches(RelOptRuleCall call) {
+    return PrelUtil.getPlannerSettings(call.getPlanner()).isHashJoinEnabled();
+  }
+  
+  @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillJoinRel join = (DrillJoinRel) call.rel(0);
     final RelNode left = call.rel(1);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index 30e2a97..30f651c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -46,6 +46,11 @@ public class MergeJoinPrule extends JoinPruleBase {
   }
 
   @Override
+  public boolean matches(RelOptRuleCall call) {
+    return PrelUtil.getPlannerSettings(call.getPlanner()).isMergeJoinEnabled();
+  }
+  
+  @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillJoinRel join = (DrillJoinRel) call.rel(0);
     final RelNode left = join.getLeft();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index ae895da..0f694af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -30,8 +30,13 @@ public class PlannerSettings implements FrameworkContext{
   private boolean useDefaultCosting = false; // True: use default Optiq costing, False: use Drill costing
 
   public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false);
-
-  public OptionManager options;
+  public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg", true);
+  public static final OptionValidator STREAMAGG = new BooleanValidator("planner.enable_streamagg", true);  
+  public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);  
+  public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);  
+  public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);  
+  
+  public OptionManager options = null;
 
   public PlannerSettings(OptionManager options){
     this.options = options;
@@ -57,6 +62,26 @@ public class PlannerSettings implements FrameworkContext{
     this.useDefaultCosting = defcost;
   }
   
+  public boolean isHashAggEnabled() {
+    return options.getOption(HASHAGG.getOptionName()).bool_val;  
+  }
+  
+  public boolean isStreamAggEnabled() {
+    return options.getOption(STREAMAGG.getOptionName()).bool_val;  
+  }
+  
+  public boolean isHashJoinEnabled() {
+    return options.getOption(HASHJOIN.getOptionName()).bool_val;
+  }
+  
+  public boolean isMergeJoinEnabled() {
+    return options.getOption(MERGEJOIN.getOptionName()).bool_val;  
+  }
+  
+  public boolean isMultiPhaseAggEnabled() {
+    return options.getOption(MULTIPHASE.getOptionName()).bool_val;
+  }
+  
   @Override
   public <T> T unwrap(Class<T> clazz) {
     if(clazz == PlannerSettings.class){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index bdc8b31..9ca9fbb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.eigenbase.rel.RelCollation;
 import org.eigenbase.rel.RelFieldCollation;
 import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.reltype.RelDataType;
 import org.eigenbase.rex.RexCall;
 import org.eigenbase.rex.RexInputRef;
@@ -86,6 +87,10 @@ public class PrelUtil {
   public static PlannerSettings getSettings(RelOptCluster cluster){
     return cluster.getPlanner().getFrameworkContext().unwrap(PlannerSettings.class);
   }
+  
+  public static PlannerSettings getPlannerSettings(RelOptPlanner planner) {
+    return planner.getFrameworkContext().unwrap(PlannerSettings.class);
+  }
 
   public static PhysicalOperator removeSvIfRequired(PhysicalOperator child, SelectionVectorMode... allowed){
     SelectionVectorMode current = child.getSVMode();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index 62b9aa5..bccdea5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.planner.logical.DrillAggregateRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.eigenbase.rel.AggregateCall;
 import org.eigenbase.rel.InvalidRelException;
 import org.eigenbase.rel.RelCollation;
 import org.eigenbase.rel.RelCollationImpl;
@@ -34,6 +35,7 @@ import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
 import org.eigenbase.trace.EigenbaseTrace;
 
 import com.google.common.collect.ImmutableList;
@@ -48,6 +50,11 @@ public class StreamAggPrule extends AggPruleBase {
   }
 
   @Override
+  public boolean matches(RelOptRuleCall call) {
+    return PrelUtil.getPlannerSettings(call.getPlanner()).isStreamAggEnabled();
+  }
+  
+  @Override
   public void onMatch(RelOptRuleCall call) {
     final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
     final RelNode input = aggregate.getChild();
@@ -82,31 +89,41 @@ public class StreamAggPrule extends AggPruleBase {
         // might be causing some problem. 
         /// TODO: re-enable this plan after resolving the issue.  
         // createTransformRequest(call, aggregate, input, traits);
-       
-        
- /*       
-        // create a 2-phase plan - commented out for now until we resolve planning for 'ANY' distribution 
-        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(DrillDistributionTrait.ANY);
-
-        RelNode convertedInput = convert(input, traits);
-        StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, convertedInput,
-                                                    aggregate.getGroupSet(),
-                                                    aggregate.getAggCallList());
-
-        int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
-        
-        HashToMergeExchangePrel exch =
-            new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
-                                        phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
-                                        collation,
-                                        numEndPoints);
-        
-        StreamAggPrel phase2Agg =  new StreamAggPrel(aggregate.getCluster(), traits, exch,
-                                                     aggregate.getGroupSet(),
-                                                     aggregate.getAggCallList());
-
-        call.transformTo(phase2Agg);      
-  */     
+ 
+        if (create2PhasePlan(call, aggregate)) {
+          traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
+
+          RelNode convertedInput = convert(input, traits);  
+
+          if (convertedInput instanceof RelSubset) {
+            RelSubset subset = (RelSubset) convertedInput;
+            for (RelNode rel : subset.getRelList()) {
+              if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
+                DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);              
+                traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(toDist);
+                RelNode newInput = convert(input, traits);
+
+                StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
+                    aggregate.getGroupSet(),
+                    aggregate.getAggCallList());
+
+                int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
+
+                HashToMergeExchangePrel exch =
+                    new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+                        phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
+                        collation,
+                        numEndPoints);
+
+                StreamAggPrel phase2Agg =  new StreamAggPrel(aggregate.getCluster(), traits, exch,
+                    aggregate.getGroupSet(),
+                    aggregate.getAggCallList());
+
+                call.transformTo(phase2Agg);                   
+              }
+            }
+          }    
+        }
       } 
     } catch (InvalidRelException e) {
       tracer.warning(e.toString());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index bd57785..7477440 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -86,20 +86,22 @@ public class DrillSqlWorker {
         .traitDefs(traitDefs) //
         .convertletTable(new DrillConvertletTable()) //
         .context(context.getPlannerSettings()) //
-        .ruleSets(getRules(context.getStorage())) //
+        .ruleSets(getRules(context)) //
         .costFactory(costFactory) //
         .build();
     this.planner = Frameworks.getPlanner(config);
 
   }
 
-  private static RuleSet[] getRules(StoragePluginRegistry storagePluginRegistry) {
+  private static RuleSet[] getRules(QueryContext context) {
+    StoragePluginRegistry storagePluginRegistry = context.getStorage();
     if (allRules == null) {
       synchronized (DrillSqlWorker.class) {
         if (allRules == null) {
-          RuleSet dirllPhysicalMem = DrillRuleSets.mergedRuleSets(
-              DrillRuleSets.DRILL_PHYSICAL_MEM, storagePluginRegistry.getStoragePluginRuleSet());
-          allRules = new RuleSet[] {DrillRuleSets.DRILL_BASIC_RULES, dirllPhysicalMem};
+          RuleSet drillPhysicalMem = DrillRuleSets.mergedRuleSets(
+              DrillRuleSets.getPhysicalRules(context),
+              storagePluginRegistry.getStoragePluginRuleSet());
+          allRules = new RuleSet[] {DrillRuleSets.DRILL_BASIC_RULES, drillPhysicalMem};
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e4d3c7ca/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 98975e4..cfe8e2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -33,7 +33,12 @@ import com.google.common.collect.Maps;
 public class SystemOptionManager implements OptionManager{
 
   private final OptionValidator[] VALIDATORS = {
-      PlannerSettings.EXCHANGE
+      PlannerSettings.EXCHANGE, 
+      PlannerSettings.HASHAGG,
+      PlannerSettings.STREAMAGG,
+      PlannerSettings.HASHJOIN,
+      PlannerSettings.MERGEJOIN, 
+      PlannerSettings.MULTIPHASE
   };
 
   private DistributedMap<OptionValue> options;