You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/20 05:01:49 UTC

[01/14] git commit: DRILL-713: Fix resizing of the hash table.

Repository: incubator-drill
Updated Branches:
  refs/heads/master 70fab8c96 -> 5d7e3d3ab


DRILL-713: Fix resizing of the hash table.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1f4276e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1f4276e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1f4276e9

Branch: refs/heads/master
Commit: 1f4276e909e1629f359f7ccaa2c777b6dce0e6d9
Parents: 70fab8c
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 18 01:33:53 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Sun May 18 01:33:53 2014 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/common/HashTableTemplate.java   | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1f4276e9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 402e395..3a8e609 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -476,15 +476,20 @@ public abstract class HashTableTemplate implements HashTable {
 
   private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) {
 
-    // resize hash table if needed and transfer the metadata
-    resizeAndRehashIfNeeded(currentIdx);
-
     addBatchIfNeeded(currentIdx);
 
     BatchHolder bh = batchHolders.get( (currentIdx >>> 16) & BATCH_MASK);
 
     if (bh.insertEntry(incomingRowIdx, currentIdx, hashValue, lastEntryBatch, lastEntryIdx)) {
       numEntries++ ;
+
+      /* Resize hash table if needed and transfer the metadata
+       * Resize only after inserting the current entry into the hash table
+       * Otherwise our calculated lastEntryBatch and lastEntryIdx
+       * becomes invalid after resize.
+       */
+      resizeAndRehashIfNeeded();
+
       return true;
     }
 
@@ -548,7 +553,7 @@ public abstract class HashTableTemplate implements HashTable {
   // For each entry in the old hash table, re-hash it to the new table and update the metadata
   // in the new table.. the metadata consists of the startIndices, links and hashValues.
   // Note that the keys stored in the BatchHolders are not moved around.
-  private void resizeAndRehashIfNeeded(int currentIdx) {
+  private void resizeAndRehashIfNeeded() {
     if (numEntries < threshold)
       return;
 


[11/14] git commit: DRILL-764: Add support for 'convert_to()' and 'convert_from()' functions from SQL

Posted by ja...@apache.org.
DRILL-764: Add support for 'convert_to()' and 'convert_from()' functions from SQL


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/73881506
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/73881506
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/73881506

Branch: refs/heads/master
Commit: 738815061432fb610962bc72bebda3f52d5b148a
Parents: 4b0d060
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Fri May 16 17:13:44 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 18:06:13 2014 -0700

----------------------------------------------------------------------
 .../expr/fn/impl/conv/DummyConvertFrom.java     |  43 ++++++
 .../exec/expr/fn/impl/conv/DummyConvertTo.java  |  43 ++++++
 .../drill/exec/planner/logical/DrillOptiq.java  |   3 +
 .../java/org/apache/drill/BaseTestQuery.java    |   2 +-
 .../physical/impl/TestConvertFunctions.java     | 147 +++++++++++--------
 5 files changed, 176 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73881506/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertFrom.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertFrom.java
new file mode 100644
index 0000000..ac75f48
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertFrom.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.expr.fn.impl.conv;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.record.RecordBatch;
+
+/**
+ * This and {@link DummyConvertTo} class merely act as a placeholder so that Optiq
+ * allows 'convert_to()' and 'convert_from()' functions in SQL. 
+ */
+@FunctionTemplate(name = "convert_from", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+public class DummyConvertFrom implements DrillSimpleFunc {
+
+  @Output VarBinaryHolder out;
+
+  @Override
+  public void setup(RecordBatch incoming) { }
+
+  @Override
+  public void eval() { }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73881506/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertTo.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertTo.java
new file mode 100644
index 0000000..36ddf6d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertTo.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.expr.fn.impl.conv;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.record.RecordBatch;
+
+/**
+ * This and {@link DummyConvertFrom} class merely act as a placeholder so that Optiq
+ * allows 'convert_to()' and 'convert_from()' functions in SQL. 
+ */
+@FunctionTemplate(name = "convert_to", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+public class DummyConvertTo implements DrillSimpleFunc {
+
+  @Output VarBinaryHolder out;
+
+  @Override
+  public void setup(RecordBatch incoming) { }
+
+  @Override
+  public void eval() { }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73881506/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
index e900fc2..1e0f7ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
@@ -326,6 +326,9 @@ public class DrillOptiq {
 
               return FunctionCallFactory.createExpression(functionName, args.subList(0, 1));
           }
+      } else if ((functionName.equals("convert_from") || functionName.equals("convert_to"))
+                    && args.get(1) instanceof QuotedString) {
+        return FunctionCallFactory.createConvert(functionName, ((QuotedString)args.get(1)).value, args.get(0), ExpressionPosition.UNKNOWN);
       }
 
       return FunctionCallFactory.createExpression(functionName, args);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73881506/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 062511e..8121a54 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -113,7 +113,7 @@ public class BaseTestQuery extends ExecTest{
     return testRunAndReturn(QueryType.PHYSICAL, physical);
   }
 
-  private List<QueryResultBatch>  testRunAndReturn(QueryType type, String query) throws Exception{
+  protected List<QueryResultBatch>  testRunAndReturn(QueryType type, String query) throws Exception{
     query = query.replace("[WORKING_PATH]", TestTools.getWorkingPath());
     return client.runQuery(type, query);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73881506/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java
index 4f9e8e9..e2935a9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java
@@ -30,8 +30,8 @@ import java.util.List;
 import mockit.Injectable;
 
 import org.apache.drill.BaseTestQuery;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
+import org.apache.drill.exec.proto.UserProtos.QueryType;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.rpc.user.UserServer;
@@ -66,172 +66,190 @@ public class TestConvertFunctions extends BaseTestQuery {
 
   @Test
   public void testDateTime1() throws Throwable {
-    runTest("(convert_from(binary_string('" + DATE_TIME_BE + "'), 'TIME_EPOCH_BE'))", time);
+    verifyPhysicalPlan("(convert_from(binary_string('" + DATE_TIME_BE + "'), 'TIME_EPOCH_BE'))", time);
   }
 
   @Test
   public void testDateTime2() throws Throwable {
-    runTest("convert_from(binary_string('" + DATE_TIME_LE + "'), 'TIME_EPOCH')", time);
+    verifyPhysicalPlan("convert_from(binary_string('" + DATE_TIME_LE + "'), 'TIME_EPOCH')", time);
   }
 
   @Test
   public void testDateTime3() throws Throwable {
-    runTest("convert_from(binary_string('" + DATE_TIME_BE + "'), 'DATE_EPOCH_BE')", date );
+    verifyPhysicalPlan("convert_from(binary_string('" + DATE_TIME_BE + "'), 'DATE_EPOCH_BE')", date );
   }
 
   @Test
   public void testDateTime4() throws Throwable {
-    runTest("convert_from(binary_string('" + DATE_TIME_LE + "'), 'DATE_EPOCH')", date);
+    verifyPhysicalPlan("convert_from(binary_string('" + DATE_TIME_LE + "'), 'DATE_EPOCH')", date);
   }
 
   @Test
   public void testFixedInts1() throws Throwable {
-    runTest("convert_from(binary_string('\\xAD'), 'TINYINT')", (byte) 0xAD);
+    verifyPhysicalPlan("convert_from(binary_string('\\xAD'), 'TINYINT')", (byte) 0xAD);
   }
 
   @Test
   public void testFixedInts2() throws Throwable {
-    runTest("convert_from(binary_string('\\xFE\\xCA'), 'SMALLINT')", (short) 0xCAFE);
+    verifyPhysicalPlan("convert_from(binary_string('\\xFE\\xCA'), 'SMALLINT')", (short) 0xCAFE);
   }
 
   @Test
   public void testFixedInts3() throws Throwable {
-    runTest("convert_from(binary_string('\\xCA\\xFE'), 'SMALLINT_BE')", (short) 0xCAFE);
+    verifyPhysicalPlan("convert_from(binary_string('\\xCA\\xFE'), 'SMALLINT_BE')", (short) 0xCAFE);
   }
 
   @Test
   public void testFixedInts4() throws Throwable {
-    runTest("convert_from(binary_string('\\xBE\\xBA\\xFE\\xCA'), 'INT')", 0xCAFEBABE);
+    verifyPhysicalPlan("convert_from(binary_string('\\xBE\\xBA\\xFE\\xCA'), 'INT')", 0xCAFEBABE);
+  }
+
+  @Test
+  public void testFixedInts4SQL_from() throws Throwable {
+    verifySQL("select"
+           + "   convert_from(binary_string('\\xBE\\xBA\\xFE\\xCA'), 'INT')"
+           + " from"
+           + "   cp.`employee.json` LIMIT 1",
+            0xCAFEBABE);
+  }
+
+  @Test
+  public void testFixedInts4SQL_to() throws Throwable {
+    verifySQL("select"
+           + "   convert_to(-889275714, 'INT')"
+           + " from"
+           + "   cp.`employee.json` LIMIT 1",
+           new byte[] {(byte) 0xBE, (byte) 0xBA, (byte) 0xFE, (byte) 0xCA});
   }
 
   @Test
   public void testFixedInts5() throws Throwable {
-    runTest("convert_from(binary_string('\\xCA\\xFE\\xBA\\xBE'), 'INT_BE')", 0xCAFEBABE);
+    verifyPhysicalPlan("convert_from(binary_string('\\xCA\\xFE\\xBA\\xBE'), 'INT_BE')", 0xCAFEBABE);
   }
 
   @Test
   public void testFixedInts6() throws Throwable {
-    runTest("convert_from(binary_string('\\xEF\\xBE\\xAD\\xDE\\xBE\\xBA\\xFE\\xCA'), 'BIGINT')", 0xCAFEBABEDEADBEEFL);
+    verifyPhysicalPlan("convert_from(binary_string('\\xEF\\xBE\\xAD\\xDE\\xBE\\xBA\\xFE\\xCA'), 'BIGINT')", 0xCAFEBABEDEADBEEFL);
   }
 
   @Test
   public void testFixedInts7() throws Throwable {
-    runTest("convert_from(binary_string('\\xCA\\xFE\\xBA\\xBE\\xDE\\xAD\\xBE\\xEF'), 'BIGINT_BE')", 0xCAFEBABEDEADBEEFL);
+    verifyPhysicalPlan("convert_from(binary_string('\\xCA\\xFE\\xBA\\xBE\\xDE\\xAD\\xBE\\xEF'), 'BIGINT_BE')", 0xCAFEBABEDEADBEEFL);
   }
 
   @Test
   public void testFixedInts8() throws Throwable {
-    runTest("convert_from(convert_to(cast(77 as varchar(2)), 'INT_BE'), 'INT_BE')", 77);
+    verifyPhysicalPlan("convert_from(convert_to(cast(77 as varchar(2)), 'INT_BE'), 'INT_BE')", 77);
   }
 
   @Test
   public void testFixedInts9() throws Throwable {
-    runTest("convert_to(cast(77 as varchar(2)), 'INT_BE')", new byte[] {0, 0, 0, 77});
+    verifyPhysicalPlan("convert_to(cast(77 as varchar(2)), 'INT_BE')", new byte[] {0, 0, 0, 77});
   }
 
   @Test
   public void testFixedInts10() throws Throwable {
-    runTest("convert_to(cast(77 as varchar(2)), 'INT')", new byte[] {77, 0, 0, 0});
+    verifyPhysicalPlan("convert_to(cast(77 as varchar(2)), 'INT')", new byte[] {77, 0, 0, 0});
   }
 
   @Test
   public void testFixedInts11() throws Throwable {
-    runTest("convert_to(77, 'BIGINT_BE')", new byte[] {0, 0, 0, 0, 0, 0, 0, 77});
+    verifyPhysicalPlan("convert_to(77, 'BIGINT_BE')", new byte[] {0, 0, 0, 0, 0, 0, 0, 77});
   }
 
   @Test
   public void testFixedInts12() throws Throwable {
-    runTest("convert_to(9223372036854775807, 'BIGINT')", new byte[] {-1, -1, -1, -1, -1, -1, -1, 0x7f});
+    verifyPhysicalPlan("convert_to(9223372036854775807, 'BIGINT')", new byte[] {-1, -1, -1, -1, -1, -1, -1, 0x7f});
   }
 
   @Test
   public void testFixedInts13() throws Throwable {
-    runTest("convert_to(-9223372036854775808, 'BIGINT')", new byte[] {0, 0, 0, 0, 0, 0, 0, (byte)0x80});
+    verifyPhysicalPlan("convert_to(-9223372036854775808, 'BIGINT')", new byte[] {0, 0, 0, 0, 0, 0, 0, (byte)0x80});
   }
 
   @Test
   public void testVInts1() throws Throwable {
-    runTest("convert_to(cast(0 as int), 'INT_HADOOPV')", new byte[] {0});
+    verifyPhysicalPlan("convert_to(cast(0 as int), 'INT_HADOOPV')", new byte[] {0});
   }
 
   @Test
   public void testVInts2() throws Throwable {
-    runTest("convert_to(cast(128 as int), 'INT_HADOOPV')", new byte[] {-113, -128});
+    verifyPhysicalPlan("convert_to(cast(128 as int), 'INT_HADOOPV')", new byte[] {-113, -128});
   }
 
   @Test
   public void testVInts3() throws Throwable {
-    runTest("convert_to(cast(256 as int), 'INT_HADOOPV')", new byte[] {-114, 1, 0});
+    verifyPhysicalPlan("convert_to(cast(256 as int), 'INT_HADOOPV')", new byte[] {-114, 1, 0});
   }
 
   @Test
   public void testVInts4() throws Throwable {
-    runTest("convert_to(cast(65536 as int), 'INT_HADOOPV')", new byte[] {-115, 1, 0, 0});
+    verifyPhysicalPlan("convert_to(cast(65536 as int), 'INT_HADOOPV')", new byte[] {-115, 1, 0, 0});
   }
 
   @Test
   public void testVInts5() throws Throwable {
-    runTest("convert_to(cast(16777216 as int), 'INT_HADOOPV')", new byte[] {-116, 1, 0, 0, 0});
+    verifyPhysicalPlan("convert_to(cast(16777216 as int), 'INT_HADOOPV')", new byte[] {-116, 1, 0, 0, 0});
   }
 
   @Test
   public void testVInts6() throws Throwable {
-    runTest("convert_to(4294967296, 'BIGINT_HADOOPV')", new byte[] {-117, 1, 0, 0, 0, 0});
+    verifyPhysicalPlan("convert_to(4294967296, 'BIGINT_HADOOPV')", new byte[] {-117, 1, 0, 0, 0, 0});
   }
 
   @Test
   public void testVInts7() throws Throwable {
-    runTest("convert_to(1099511627776, 'BIGINT_HADOOPV')", new byte[] {-118, 1, 0, 0, 0, 0, 0});
+    verifyPhysicalPlan("convert_to(1099511627776, 'BIGINT_HADOOPV')", new byte[] {-118, 1, 0, 0, 0, 0, 0});
   }
 
   @Test
   public void testVInts8() throws Throwable {
-    runTest("convert_to(281474976710656, 'BIGINT_HADOOPV')", new byte[] {-119, 1, 0, 0, 0, 0, 0, 0});
+    verifyPhysicalPlan("convert_to(281474976710656, 'BIGINT_HADOOPV')", new byte[] {-119, 1, 0, 0, 0, 0, 0, 0});
   }
 
   @Test
   public void testVInts9() throws Throwable {
-    runTest("convert_to(72057594037927936, 'BIGINT_HADOOPV')", new byte[] {-120, 1, 0, 0, 0, 0, 0, 0, 0});
+    verifyPhysicalPlan("convert_to(72057594037927936, 'BIGINT_HADOOPV')", new byte[] {-120, 1, 0, 0, 0, 0, 0, 0, 0});
   }
 
   @Test
   public void testVInts10() throws Throwable {
-    runTest("convert_to(9223372036854775807, 'BIGINT_HADOOPV')", new byte[] {-120, 127, -1, -1, -1, -1, -1, -1, -1});
+    verifyPhysicalPlan("convert_to(9223372036854775807, 'BIGINT_HADOOPV')", new byte[] {-120, 127, -1, -1, -1, -1, -1, -1, -1});
   }
 
   @Test
   public void testVInts11() throws Throwable {
-    runTest("convert_from(binary_string('\\x88\\x7f\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF'), 'BIGINT_HADOOPV')", 9223372036854775807L);
+    verifyPhysicalPlan("convert_from(binary_string('\\x88\\x7f\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF'), 'BIGINT_HADOOPV')", 9223372036854775807L);
   }
 
   @Test
   public void testVInts12() throws Throwable {
-    runTest("convert_to(-9223372036854775808, 'BIGINT_HADOOPV')", new byte[] {-128, 127, -1, -1, -1, -1, -1, -1, -1});
+    verifyPhysicalPlan("convert_to(-9223372036854775808, 'BIGINT_HADOOPV')", new byte[] {-128, 127, -1, -1, -1, -1, -1, -1, -1});
   }
 
   @Test
   public void testVInts13() throws Throwable {
-    runTest("convert_from(binary_string('\\x80\\x7f\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF'), 'BIGINT_HADOOPV')", -9223372036854775808L);
+    verifyPhysicalPlan("convert_from(binary_string('\\x80\\x7f\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF'), 'BIGINT_HADOOPV')", -9223372036854775808L);
   }
 
   @Test
   public void testBool1() throws Throwable {
-    runTest("convert_from(binary_string('\\x01'), 'BOOLEAN_BYTE')", true);
+    verifyPhysicalPlan("convert_from(binary_string('\\x01'), 'BOOLEAN_BYTE')", true);
   }
 
   @Test
   public void testBool2() throws Throwable {
-    runTest("convert_from(binary_string('\\x00'), 'BOOLEAN_BYTE')", false);
+    verifyPhysicalPlan("convert_from(binary_string('\\x00'), 'BOOLEAN_BYTE')", false);
   }
 
   @Test
   public void testBool3() throws Throwable {
-    runTest("convert_to(true, 'BOOLEAN_BYTE')", new byte[] {1});
+    verifyPhysicalPlan("convert_to(true, 'BOOLEAN_BYTE')", new byte[] {1});
   }
 
   @Test
   public void testBool4() throws Throwable {
-    runTest("convert_to(false, 'BOOLEAN_BYTE')", new byte[] {0});
+    verifyPhysicalPlan("convert_to(false, 'BOOLEAN_BYTE')", new byte[] {0});
   }
 
   @Test
@@ -240,47 +258,47 @@ public class TestConvertFunctions extends BaseTestQuery {
 
   @Test
   public void testFloats2() throws Throwable {
-    runTest("convert_from(convert_to(cast(77 as float4), 'FLOAT'), 'FLOAT')", new Float(77.0));
+    verifyPhysicalPlan("convert_from(convert_to(cast(77 as float4), 'FLOAT'), 'FLOAT')", new Float(77.0));
   }
 
   @Test
   public void testFloats3() throws Throwable {
-    runTest("convert_to(cast(1.4e-45 as float4), 'FLOAT')", new byte[] {1, 0, 0, 0});
+    verifyPhysicalPlan("convert_to(cast(1.4e-45 as float4), 'FLOAT')", new byte[] {1, 0, 0, 0});
   }
 
   @Test
   public void testFloats4() throws Throwable {
-    runTest("convert_to(cast(3.4028235e+38 as float4), 'FLOAT')", new byte[] {-1, -1, 127, 127});
+    verifyPhysicalPlan("convert_to(cast(3.4028235e+38 as float4), 'FLOAT')", new byte[] {-1, -1, 127, 127});
   }
 
   @Test
   public void testFloats5(@Injectable final DrillbitContext bitContext,
                            @Injectable UserServer.UserClientConnection connection) throws Throwable {
-    runTest("convert_from(convert_to(cast(77 as float8), 'DOUBLE'), 'DOUBLE')", 77.0);
+    verifyPhysicalPlan("convert_from(convert_to(cast(77 as float8), 'DOUBLE'), 'DOUBLE')", 77.0);
   }
 
   @Test
   public void testFloats6(@Injectable final DrillbitContext bitContext,
                            @Injectable UserServer.UserClientConnection connection) throws Throwable {
-    runTest("convert_to(cast(77 as float8), 'DOUBLE')", new byte[] {0, 0, 0, 0, 0, 64, 83, 64});
+    verifyPhysicalPlan("convert_to(cast(77 as float8), 'DOUBLE')", new byte[] {0, 0, 0, 0, 0, 64, 83, 64});
   }
 
   @Test
   public void testFloats7(@Injectable final DrillbitContext bitContext,
                            @Injectable UserServer.UserClientConnection connection) throws Throwable {
-    runTest("convert_to(4.9e-324, 'DOUBLE')", new byte[] {1, 0, 0, 0, 0, 0, 0, 0});
+    verifyPhysicalPlan("convert_to(4.9e-324, 'DOUBLE')", new byte[] {1, 0, 0, 0, 0, 0, 0, 0});
   }
 
   @Test
   public void testFloats8(@Injectable final DrillbitContext bitContext,
                            @Injectable UserServer.UserClientConnection connection) throws Throwable {
-    runTest("convert_to(1.7976931348623157e+308, 'DOUBLE')", new byte[] {-1, -1, -1, -1, -1, -1, -17, 127});
+    verifyPhysicalPlan("convert_to(1.7976931348623157e+308, 'DOUBLE')", new byte[] {-1, -1, -1, -1, -1, -1, -17, 127});
   }
 
   @Test
   public void testUTF8() throws Throwable {
-    runTest("convert_from(binary_string('apache_drill'), 'UTF8')", "apache_drill");
-    runTest("convert_to('apache_drill', 'UTF8')", new byte[] {'a', 'p', 'a', 'c', 'h', 'e', '_', 'd', 'r', 'i', 'l', 'l'});
+    verifyPhysicalPlan("convert_from(binary_string('apache_drill'), 'UTF8')", "apache_drill");
+    verifyPhysicalPlan("convert_to('apache_drill', 'UTF8')", new byte[] {'a', 'p', 'a', 'c', 'h', 'e', '_', 'd', 'r', 'i', 'l', 'l'});
   }
 
   @Test
@@ -340,31 +358,27 @@ public class TestConvertFunctions extends BaseTestQuery {
     assertEquals(intVal, Integer.MIN_VALUE);
   }
 
-  protected <T> void runTest(String expression, T expectedResults) throws Throwable {
-    String testName = String.format("Expression: %s.", expression);
+  protected <T> void verifySQL(String sql, T expectedResults) throws Throwable {
+    verifyResults(sql, expectedResults, getRunResult(QueryType.SQL, sql));
+  }
+
+  protected <T> void verifyPhysicalPlan(String expression, T expectedResults) throws Throwable {
     expression = expression.replace("\\", "\\\\\\\\"); // "\\\\\\\\" => Java => "\\\\" => JsonParser => "\\" => AntlrParser "\"
 
     if (textFileContent == null) textFileContent = Resources.toString(Resources.getResource(CONVERSION_TEST_PHYSICAL_PLAN), Charsets.UTF_8);
     String planString = textFileContent.replace("__CONVERT_EXPRESSION__", expression);
 
-    Object[] results = getRunResult(planString);
-    assertEquals(testName, 1, results.length);
-    assertNotNull(testName, results[0]);
-    if (expectedResults.getClass().isArray()) {
-      assertArraysEquals(testName, expectedResults, results[0]);
-    } else {
-      assertEquals(testName, expectedResults, results[0]);
-    }
+    verifyResults(expression, expectedResults, getRunResult(QueryType.PHYSICAL, planString));
   }
 
-  protected Object[] getRunResult(String planString) throws Exception {
+  protected Object[] getRunResult(QueryType queryType, String planString) throws Exception {
+    List<QueryResultBatch> resultList = testRunAndReturn(queryType, planString);
+
     List<Object> res = new ArrayList<Object>();
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-
-    List<QueryResultBatch> resultList = testPhysicalWithResults(planString);
     for(QueryResultBatch result : resultList) {
-      loader.load(result.getHeader().getDef(), result.getData());
-      if (loader.getRecordCount() > 0) {
+      if (result.getData() != null) {
+        loader.load(result.getHeader().getDef(), result.getData());
         ValueVector v = loader.iterator().next().getValueVector();
         for (int j = 0; j < v.getAccessor().getValueCount(); j++) {
           if  (v instanceof VarCharVector) {
@@ -373,14 +387,25 @@ public class TestConvertFunctions extends BaseTestQuery {
             res.add(v.getAccessor().getObject(j));
           }
         }
+        loader.clear();
+        result.release();
       }
-      loader.clear();
-      result.release();
     }
 
     return res.toArray();
   }
 
+  protected <T> void verifyResults(String expression, T expectedResults, Object[] actualResults) throws Throwable {
+    String testName = String.format("Expression: %s.", expression);
+    assertEquals(testName, 1, actualResults.length);
+    assertNotNull(testName, actualResults[0]);
+    if (expectedResults.getClass().isArray()) {
+      assertArraysEquals(testName, expectedResults, actualResults[0]);
+    } else {
+      assertEquals(testName, expectedResults, actualResults[0]);
+    }
+  }
+
   protected void assertArraysEquals(Object expected, Object actual) {
     assertArraysEquals(null, expected, actual);
   }


[04/14] git commit: DRILL-769: Remove swapping using decimal holders.

Posted by ja...@apache.org.
DRILL-769: Remove swapping using decimal holders.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5a78ff8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5a78ff8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5a78ff8c

Branch: refs/heads/master
Commit: 5a78ff8c546b80b0f747c558688786714eb09b20
Parents: 828a5c6
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 18 16:54:39 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Mon May 19 10:43:14 2014 -0700

----------------------------------------------------------------------
 .../templates/Decimal/DecimalFunctions.java     | 78 ++++++++++----------
 .../main/codegen/templates/ValueHolders.java    | 27 -------
 .../drill/jdbc/test/TestFunctionsQuery.java     | 13 ++++
 3 files changed, 50 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a78ff8c/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java b/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
index 6cd6ade..8f14e83 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
@@ -31,26 +31,26 @@ import org.apache.drill.exec.expr.annotations.Workspace;
     }
 </#macro>
 
-<#macro subtractBlock holderType left right result>
+<#macro subtractBlock holderType in1 in2 result>
 
             /* compute the result's size, integer part and fractional part */
-            result.scale   = Math.max(left.scale, right.scale);
+            result.scale   = Math.max(${in1}.scale, ${in2}.scale);
             result.precision = result.maxPrecision;
 
 
             int resultScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(result.scale);
             int resultIndex = result.nDecimalDigits- 1;
 
-            int leftScaleRoundedUp  = org.apache.drill.common.util.DecimalUtility.roundUp(left.scale);
-            int leftIntRoundedUp    = org.apache.drill.common.util.DecimalUtility.roundUp(left.precision - left.scale);
-            int rightScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(right.scale);
+            int leftScaleRoundedUp  = org.apache.drill.common.util.DecimalUtility.roundUp(${in1}.scale);
+            int leftIntRoundedUp    = org.apache.drill.common.util.DecimalUtility.roundUp(${in1}.precision - ${in1}.scale);
+            int rightScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(${in2}.scale);
 
-            int leftIndex  = left.nDecimalDigits - 1;
-            int rightIndex = right.nDecimalDigits - 1;
+            int leftIndex  = ${in1}.nDecimalDigits - 1;
+            int rightIndex = ${in2}.nDecimalDigits - 1;
 
             /* If the left scale is bigger, simply copy over the digits into result */
             while (leftScaleRoundedUp > rightScaleRoundedUp) {
-                result.setInteger(resultIndex, left.getInteger(leftIndex));
+                result.setInteger(resultIndex, ${in1}.getInteger(leftIndex));
                 leftIndex--;
                 resultIndex--;
                 leftScaleRoundedUp--;
@@ -60,7 +60,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
             int carry = 0;
             while(rightScaleRoundedUp > leftScaleRoundedUp) {
 
-                int difference = 0 - right.getInteger(rightIndex) - carry;
+                int difference = 0 - ${in2}.getInteger(rightIndex) - carry;
                 rightIndex--;
 
                 if (difference < 0) {
@@ -80,7 +80,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
              */
             while (leftScaleRoundedUp > 0) {
 
-                int difference = left.getInteger(leftIndex) - right.getInteger(rightIndex) - carry;
+                int difference = ${in1}.getInteger(leftIndex) - ${in2}.getInteger(rightIndex) - carry;
                 leftIndex--;
                 rightIndex--;
 
@@ -100,11 +100,11 @@ import org.apache.drill.exec.expr.annotations.Workspace;
              */
             while(leftIntRoundedUp > 0) {
 
-                int difference = left.getInteger(leftIndex);
+                int difference = ${in1}.getInteger(leftIndex);
                 leftIndex--;
 
                 if (rightIndex >= 0) {
-                    difference -= right.getInteger(rightIndex);
+                    difference -= ${in2}.getInteger(rightIndex);
                     rightIndex--;
                 }
 
@@ -123,20 +123,20 @@ import org.apache.drill.exec.expr.annotations.Workspace;
 
 </#macro>
 
-<#macro addBlock holderType left right result>
+<#macro addBlock holderType in1 in2 result>
 
         /* compute the result scale */
-        result.scale = Math.max(left.scale, right.scale);
+        result.scale = Math.max(${in1}.scale, ${in2}.scale);
         result.precision = result.maxPrecision;
 
         int resultScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(result.scale);
 
-        int leftScaleRoundedUp  = org.apache.drill.common.util.DecimalUtility.roundUp(left.scale);
-        int rightScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(right.scale);
+        int leftScaleRoundedUp  = org.apache.drill.common.util.DecimalUtility.roundUp(${in1}.scale);
+        int rightScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(${in2}.scale);
 
         /* starting index for each decimal */
-        int leftIndex  = left.nDecimalDigits - 1;
-        int rightIndex = right.nDecimalDigits - 1;
+        int leftIndex  = ${in1}.nDecimalDigits - 1;
+        int rightIndex = ${in2}.nDecimalDigits - 1;
         int resultIndex = result.nDecimalDigits - 1;
 
         /* If one of the scale is larger then simply copy it over
@@ -144,7 +144,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
          */
         while (leftScaleRoundedUp > rightScaleRoundedUp) {
 
-            result.setInteger(resultIndex, left.getInteger(leftIndex));
+            result.setInteger(resultIndex, ${in1}.getInteger(leftIndex));
             leftIndex--;
             resultIndex--;
             leftScaleRoundedUp--;
@@ -152,7 +152,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
         }
 
         while (rightScaleRoundedUp > leftScaleRoundedUp) {
-            result.setInteger((resultIndex), right.getInteger(rightIndex));
+            result.setInteger((resultIndex), ${in2}.getInteger(rightIndex));
             rightIndex--;
             resultIndex--;
             rightScaleRoundedUp--;
@@ -164,7 +164,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
         /* now the two scales are at the same level, we can add them */
         while (resultScaleRoundedUp > 0) {
 
-            sum += left.getInteger(leftIndex) + right.getInteger(rightIndex);
+            sum += ${in1}.getInteger(leftIndex) + ${in2}.getInteger(rightIndex);
             leftIndex--;
             rightIndex--;
 
@@ -182,7 +182,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
         /* add the integer part */
         while (leftIndex >= 0 && rightIndex >= 0) {
 
-            sum += left.getInteger(leftIndex) + right.getInteger(rightIndex);
+            sum += ${in1}.getInteger(leftIndex) + ${in2}.getInteger(rightIndex);
             leftIndex--;
             rightIndex--;
 
@@ -197,7 +197,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
         }
 
         while (resultIndex >= 0 && leftIndex >= 0) {
-            sum += left.getInteger(leftIndex);
+            sum += ${in1}.getInteger(leftIndex);
             leftIndex--;
 
             if (sum >= org.apache.drill.common.util.DecimalUtility.DIGITS_BASE) {
@@ -210,7 +210,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
         }
 
         while (resultIndex >= 0 && rightIndex >= 0) {
-            sum += right.getInteger(rightIndex);
+            sum += ${in2}.getInteger(rightIndex);
             rightIndex--;
 
             if (sum >= org.apache.drill.common.util.DecimalUtility.DIGITS_BASE) {
@@ -294,7 +294,7 @@ public class ${type.name}Functions {
              * becomes addition
              */
             if (left.sign != right.sign) {
-                <@addBlock holderType=type.name left="left" right="right" result="result"/>
+                <@addBlock holderType=type.name in1="left" in2="right" result="result"/>
                 result.sign = left.sign;
             } else {
                 /* Sign of the inputs are the same, meaning we have to perform subtraction
@@ -305,7 +305,9 @@ public class ${type.name}Functions {
                 <@compareBlock holderType=type.name left="left" right="right" absCompare="true" output="cmp"/>
 
                 if (cmp == -1) {
-                    left.swap(right);
+                  <@subtractBlock holderType=type.name in1="right" in2="left" result="result"/>
+                } else {
+                  <@subtractBlock holderType=type.name in1="left" in2="right" result="result"/>
                 }
 
                 //Determine the sign of the result
@@ -314,10 +316,6 @@ public class ${type.name}Functions {
                 } else {
                     result.sign = false;
                 }
-
-                // Perform the subtraction
-                <@subtractBlock holderType=type.name left="left" right="right" result="result"/>
-
             }
 
         }
@@ -356,21 +354,19 @@ public class ${type.name}Functions {
                 <@compareBlock holderType=type.name left="left" right="right" absCompare="true" output="cmp"/>
 
                 if (cmp == -1) {
-                    left.swap(right);
+                    <@subtractBlock holderType=type.name in1="right" in2="left" result="result"/>
+                    result.sign = right.sign;
+                } else {
+                    <@subtractBlock holderType=type.name in1="left" in2="right" result="result"/>
+                    result.sign = left.sign;
                 }
-                /* Perform the subtraction */
-                <@subtractBlock holderType=type.name left="left" right="right" result="result"/>
+
+
             } else {
                 /* Sign of the two input decimals is the same, use the add logic */
-                <@addBlock holderType=type.name left="left" right="right" result="result"/>
+                <@addBlock holderType=type.name in1="left" in2="right" result="result"/>
+                result.sign = left.sign;
             }
-
-            /* Assign the result to be the sign of the left input
-             * If the two input signs are the same, we can choose either to be the resulting sign
-             * If the two input signs are different, we assign left input to be the greater absolute value
-             * hence result will have the same sign as left
-             */
-            result.sign = left.sign;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a78ff8c/exec/java-exec/src/main/codegen/templates/ValueHolders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ValueHolders.java b/exec/java-exec/src/main/codegen/templates/ValueHolders.java
index 7272f4f..5cea7e3 100644
--- a/exec/java-exec/src/main/codegen/templates/ValueHolders.java
+++ b/exec/java-exec/src/main/codegen/templates/ValueHolders.java
@@ -82,33 +82,6 @@ public final class ${className} implements ValueHolder{
         buffer.setInt(start + (index * 4), value);
     }
 
-    // TODO: This is a temporary hack to swap holders. We need a generic solution for this issue
-    public void swap(${className} right) {
-        int tempScale = this.scale;
-        int tempPrec = this.precision;
-        boolean tempSign = this.sign;
-        ByteBuf tempBuf = this.buffer;
-        int start = this.start;
-
-        this.scale = right.scale;
-        this.precision = right.precision;
-        this.sign = right.sign;
-        this.buffer = right.buffer;
-        this.start = right.start;
-
-        right.scale = tempScale;
-        right.precision = tempPrec;
-        right.sign = tempSign;
-        right.buffer = tempBuf;
-        right.start = start;
-
-        <#if mode.prefix == "Nullable">
-        int isSet = this.isSet;
-        this.isSet = right.isSet;
-        right.isSet = isSet;
-        </#if>
-    }
-
     <#else>
     public ${minor.javaType!type.javaType} value;
     </#if>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a78ff8c/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
index 78bcd95..05884e5 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
@@ -439,4 +439,17 @@ public class TestFunctionsQuery {
     JdbcAssert.withNoDefaultSchema()
         .sql(query);
   }
+
+  @Test
+  public void testDecimalAddConstant() throws Exception {
+    String query = "select (cast('-1' as decimal(38, 3)) + cast (employee_id as decimal(38, 3))) as CNT " +
+        "from cp.`employee.json` where employee_id <= 4";
+
+    JdbcAssert.withNoDefaultSchema()
+        .sql(query)
+        .returns(
+            "CNT=0.000\n" +
+            "CNT=1.000\n" +
+            "CNT=3.000\n");
+  }
 }


[13/14] git commit: DRILL-783: Convert function support in HBase filter push down.

Posted by ja...@apache.org.
DRILL-783: Convert function support in HBase filter push down.

+ Enable HBase test suit (failures fixed by DRILL-761).


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e9e63c4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e9e63c4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e9e63c4a

Branch: refs/heads/master
Commit: e9e63c4a7f3ac354dc346799f19bdccc56d8bd0e
Parents: 6d4dc8f
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Fri May 16 17:48:55 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 18:06:32 2014 -0700

----------------------------------------------------------------------
 .../common/expression/ConvertExpression.java    |  23 +-
 .../expression/ExpressionStringBuilder.java     |   2 +-
 .../store/hbase/CompareFunctionsProcessor.java  | 232 +++++++++++++++++++
 .../exec/store/hbase/HBaseFilterBuilder.java    |  74 ++----
 .../org/apache/drill/hbase/HBaseTestsSuite.java |   1 -
 .../drill/hbase/TestHBaseFilterPushDown.java    |   9 +-
 .../drill/exec/expr/EvaluationVisitor.java      |   2 +-
 .../exec/expr/ExpressionTreeMaterializer.java   |   2 +-
 8 files changed, 278 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/common/src/main/java/org/apache/drill/common/expression/ConvertExpression.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/expression/ConvertExpression.java b/common/src/main/java/org/apache/drill/common/expression/ConvertExpression.java
index c028083..9debd15 100644
--- a/common/src/main/java/org/apache/drill/common/expression/ConvertExpression.java
+++ b/common/src/main/java/org/apache/drill/common/expression/ConvertExpression.java
@@ -28,23 +28,26 @@ public class ConvertExpression extends LogicalExpressionBase implements Iterable
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConvertExpression.class);
 
+  public static final String CONVERT_FROM = "convert_from";
+  public static final String CONVERT_TO = "convert_to";
+  
   private final LogicalExpression input;
   private final MajorType type;
   private final String convertFunction;
-  private final String conversionType;
+  private final String encodingType;
 
   /**
-   * @param conversionType
+   * @param encodingType
    * @param convertFunction
    * @param input
    * @param pos
    */
-  public ConvertExpression(String convertFunction, String conversionType, LogicalExpression input, ExpressionPosition pos) {
+  public ConvertExpression(String convertFunction, String encodingType, LogicalExpression input, ExpressionPosition pos) {
     super(pos);
     this.input = input;
-    this.convertFunction = convertFunction.toLowerCase();
-    this.conversionType = conversionType.toUpperCase();
-    this.type = Types.getMajorTypeFromName(conversionType.split("_", 2)[0].toLowerCase());
+    this.convertFunction = CONVERT_FROM.equals(convertFunction.toLowerCase()) ? CONVERT_FROM : CONVERT_TO;
+    this.encodingType = encodingType.toUpperCase();
+    this.type = Types.getMajorTypeFromName(encodingType.split("_", 2)[0].toLowerCase());
   }
 
   @Override
@@ -70,13 +73,13 @@ public class ConvertExpression extends LogicalExpressionBase implements Iterable
     return type;
   }
 
-  public String getConversionType() {
-    return conversionType;
+  public String getEncodingType() {
+    return encodingType;
   }
 
   @Override
   public String toString() {
-    return "ConvertExpression [input=" + input + ", type=" + type + ", convertFunction="
-        + convertFunction + ", conversionType=" + conversionType + "]";
+    return "ConvertExpression [input=" + input + ", type=" + Types.toString(type) + ", convertFunction="
+        + convertFunction + ", conversionType=" + encodingType + "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/common/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java b/common/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java
index 9588863..9301528 100644
--- a/common/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java
+++ b/common/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java
@@ -226,7 +226,7 @@ public class ExpressionStringBuilder extends AbstractExprVisitor<Void, StringBui
   public Void visitConvertExpression(ConvertExpression e, StringBuilder sb) throws RuntimeException {
     sb.append(e.getConvertFunction()).append("(");
     e.getInput().accept(this, sb);
-    sb.append(", \"").append(e.getConversionType()).append("\")");
+    sb.append(", \"").append(e.getEncodingType()).append("\")");
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
new file mode 100644
index 0000000..6810f81
--- /dev/null
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hbase;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.nio.ByteOrder;
+
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.ConvertExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
+  private byte[] value;
+  private boolean success;
+  private boolean isEqualityFn;
+  private SchemaPath path;
+  private String functionName;
+
+  public static boolean isCompareFunction(String functionName) {
+    return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+  }
+
+  public static CompareFunctionsProcessor process(FunctionCall call, boolean nullComparatorSupported) {
+    String functionName = call.getName();
+    LogicalExpression nameArg = call.args.get(0);
+    LogicalExpression valueArg = call.args.size() == 2 ? call.args.get(1) : null;
+    CompareFunctionsProcessor evaluator = new CompareFunctionsProcessor(functionName);
+
+    if (valueArg != null) { // binary function
+      if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+        LogicalExpression swapArg = valueArg;
+        valueArg = nameArg;
+        nameArg = swapArg;
+        evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+      }
+      evaluator.success = nameArg.accept(evaluator, valueArg);
+    } else if (nullComparatorSupported && call.args.get(0) instanceof SchemaPath) {
+      evaluator.success = true;
+      evaluator.path = (SchemaPath) nameArg;
+    }
+
+    return evaluator;
+  }
+
+  public CompareFunctionsProcessor(String functionName) {
+    this.success = false;
+    this.functionName = functionName;
+    this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)
+        && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(functionName);
+  }
+
+  public byte[] getValue() {
+    return value;
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+
+  public SchemaPath getPath() {
+    return path;
+  }
+
+  public String getFunctionName() {
+    return functionName;
+  }
+
+  @Override
+  public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException {
+    if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) {
+      return e.getInput().accept(this, valueArg);
+    }
+    return false;
+  }
+
+  @Override
+  public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression valueArg) throws RuntimeException {
+    if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM && e.getInput() instanceof SchemaPath) {
+      ByteBuf bb = null;
+      String encodingType = e.getEncodingType();
+      switch (encodingType) {
+      case "INT_BE":
+      case "INT":
+      case "UINT_BE":
+      case "UINT":
+      case "UINT4_BE":
+      case "UINT4":
+        if (valueArg instanceof IntExpression
+            && (isEqualityFn || encodingType.startsWith("U"))) {
+          bb = Unpooled.wrappedBuffer(new byte[4]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+          bb.writeInt(((IntExpression)valueArg).getInt());
+        }
+        break;
+      case "BIGINT_BE":
+      case "BIGINT":
+      case "UINT8_BE":
+      case "UINT8":
+        if (valueArg instanceof LongExpression
+            && (isEqualityFn || encodingType.startsWith("U"))) {
+          bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+          bb.writeLong(((LongExpression)valueArg).getLong());
+        }
+        break;
+      case "FLOAT":
+        if (valueArg instanceof FloatExpression && isEqualityFn) {
+          bb = Unpooled.wrappedBuffer(new byte[4]).order(ByteOrder.BIG_ENDIAN);
+          bb.writeFloat(((FloatExpression)valueArg).getFloat());
+        }
+        break;
+      case "DOUBLE":
+        if (valueArg instanceof DoubleExpression && isEqualityFn) {
+          bb = Unpooled.wrappedBuffer(new byte[8]).order(ByteOrder.BIG_ENDIAN);;
+          bb.writeDouble(((DoubleExpression)valueArg).getDouble());
+        }
+        break;
+      case "TIME_EPOCH":
+      case "TIME_EPOCH_BE":
+        if (valueArg instanceof TimeExpression) {
+          bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+          bb.writeLong(((TimeExpression)valueArg).getTime());
+        }
+        break;
+      case "DATE_EPOCH":
+      case "DATE_EPOCH_BE":
+        if (valueArg instanceof DateExpression) {
+          bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+          bb.writeLong(((DateExpression)valueArg).getDate());
+        }
+        break;
+      case "BOOLEAN_BYTE":
+        if (valueArg instanceof BooleanExpression) {
+          bb = Unpooled.wrappedBuffer(new byte[1]);
+          bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0);
+        }
+        break;
+      case "UTF8":
+        // let visitSchemaPath() handle this.
+        return e.getInput().accept(this, valueArg);
+      }
+
+      if (bb != null) {
+        this.value = bb.array();
+        this.path = (SchemaPath)e.getInput();
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
+    return false;
+  }
+
+  @Override
+  public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
+    if (valueArg instanceof QuotedString) {
+      this.value = ((QuotedString) valueArg).value.getBytes();
+      this.path = path;
+      return true;
+    }
+    return false;
+  }
+
+  private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
+  static {
+    ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
+    VALUE_EXPRESSION_CLASSES = builder
+        .add(BooleanExpression.class)
+        .add(DateExpression.class)
+        .add(DoubleExpression.class)
+        .add(FloatExpression.class)
+        .add(IntExpression.class)
+        .add(LongExpression.class)
+        .add(QuotedString.class)
+        .add(TimeExpression.class)
+        .build();
+  }
+
+  private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+  static {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+        // unary functions
+        .put("isnotnull", "isnotnull")
+        .put("isNotNull", "isNotNull")
+        .put("is not null", "is not null")
+        .put("isnull", "isnull")
+        .put("isNull", "isNull")
+        .put("is null", "is null")
+        // binary functions
+        .put("equal", "equal")
+        .put("not_equal", "not_equal")
+        .put("greater_than_or_equal_to", "less_than_or_equal_to")
+        .put("greater_than", "less_than")
+        .put("less_than_or_equal_to", "greater_than_or_equal_to")
+        .put("less_than", "greater_than")
+        .build();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
index 924cd6e..ad26972 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
@@ -19,13 +19,10 @@ package org.apache.drill.exec.store.hbase;
 
 import java.util.Arrays;
 
-import org.apache.drill.common.expression.CastExpression;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.ValueExpressions.QuotedString;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -36,8 +33,6 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
 
 public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
 
@@ -54,7 +49,17 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
 
   public HBaseScanSpec parseTree() {
     HBaseScanSpec parsedSpec = le.accept(this, null);
-    return parsedSpec != null ? mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), parsedSpec ) : null;
+    if (parsedSpec != null) {
+      parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), parsedSpec);
+      /*
+       * If RowFilter is THE filter attached to the scan specification,
+       * remove it since its effect is also achieved through startRow and stopRow.
+       */
+      if (parsedSpec.filter instanceof RowFilter) {
+        parsedSpec.filter = null;
+      }
+    }
+    return parsedSpec;
   }
 
   public boolean isAllExpressionsConverted() {
@@ -72,22 +77,18 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
     HBaseScanSpec nodeScanSpec = null;
     String functionName = call.getName();
     ImmutableList<LogicalExpression> args = call.args;
-    if (COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)) {
-      LogicalExpression nameArg = args.get(0);
-      LogicalExpression valueArg = args.get(1);
-      if (nameArg instanceof QuotedString) {
-        valueArg = nameArg;
-        nameArg = args.get(1);
-        functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
-      }
 
-      while (nameArg instanceof CastExpression
-          && nameArg.getMajorType().getMinorType() == MinorType.VARCHAR) {
-        nameArg = ((CastExpression) nameArg).getInput();
-      }
+    if (CompareFunctionsProcessor.isCompareFunction(functionName)) {
+      /*
+       * HBASE-10848: Bug in HBase versions (0.94.[0-18], 0.96.[0-2], 0.98.[0-1])
+       * causes a filter with NullComparator to fail. Enable only if specified in
+       * the configuration (after ensuring that the HBase cluster has the fix).
+       */
+      boolean nullComparatorSupported = groupScan.getHBaseConf().getBoolean("drill.hbase.supports.null.comparator", false);
 
-      if (nameArg instanceof SchemaPath && valueArg instanceof QuotedString) {
-        nodeScanSpec = createHBaseScanSpec(functionName, (SchemaPath) nameArg, ((QuotedString) valueArg).value.getBytes());
+      CompareFunctionsProcessor processor = CompareFunctionsProcessor.process(call, nullComparatorSupported);
+      if (processor.isSuccess()) {
+        nodeScanSpec = createHBaseScanSpec(processor.getFunctionName(), processor.getPath(), processor.getValue());
       }
     } else {
       switch (functionName) {
@@ -104,28 +105,13 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
           }
         }
         break;
-      case "isnotnull":
-      case "isNotNull":
-      case "is not null":
-      case "isnull":
-      case "isNull":
-      case "is null":
-        /*
-         * HBASE-10848: Bug in HBase versions (0.94.[0-18], 0.96.[0-2], 0.98.[0-1])
-         * causes a filter with NullComparator to fail. Enable only if specified in
-         * the configuration (after ensuring that the HBase cluster has the fix).
-         */
-        if (groupScan.getHBaseConf().getBoolean("drill.hbase.supports.null.comparator", false)) {
-          if (args.get(0) instanceof SchemaPath) {
-            nodeScanSpec = createHBaseScanSpec(functionName, ((SchemaPath) args.get(0)), null);
-          }
-        }
       }
     }
 
     if (nodeScanSpec == null) {
       allExpressionsConverted = false;
     }
+
     return nodeScanSpec;
   }
 
@@ -182,7 +168,8 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
     case "greater_than":
       compareOp = CompareOp.GREATER;
       if (isRowKey) {
-        startRow = fieldValue;
+        // startRow should be just greater than 'value'
+        startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
       }
       break;
     case "less_than_or_equal_to":
@@ -240,17 +227,4 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
     return null;
   }
 
-  private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
-  static {
-   Builder<String, String> builder = ImmutableMap.builder();
-   COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
-       .put("equal", "equal")
-       .put("not_equal", "not_equal")
-       .put("greater_than_or_equal_to", "less_than_or_equal_to")
-       .put("greater_than", "less_than")
-       .put("less_than_or_equal_to", "greater_than_or_equal_to")
-       .put("less_than", "greater_than")
-       .build();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index 3e91361..3881f4d 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -33,7 +33,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 import org.junit.runners.Suite.SuiteClasses;
 
-@Ignore("Need to fix HBaseRecordReader")
 @RunWith(Suite.class)
 @SuiteClasses({
   HBaseRecordReaderTest.class,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 6dd418c..76300b7 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -17,6 +17,10 @@
  */
 package org.apache.drill.hbase;
 
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -56,15 +60,14 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
   }
 
   @Test
-  @Ignore("Until convert_from() functions are working.")
   public void testFilterPushDownConvertExpression() throws Exception {
     runSQLVerifyCount("SELECT\n"
         + "  *\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName\n"
         + "WHERE\n"
-        + "  convert_from(row_key, 'INT_BE') > 12"
-        , -1);
+        + "  convert_from(row_key, 'UTF8') > 'b4'"
+        , 2);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 731ab6b..b03882e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -541,7 +541,7 @@ public class EvaluationVisitor {
 
     @Override
     public HoldingContainer visitConvertExpression(ConvertExpression e, ClassGenerator<?> value) throws RuntimeException {
-      String convertFunctionName = e.getConvertFunction() + e.getConversionType();
+      String convertFunctionName = e.getConvertFunction() + e.getEncodingType();
 
       List<LogicalExpression> newArgs = Lists.newArrayList();
       newArgs.add(e.getInput());  //input_expr

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index 0267be3..6297148 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -326,7 +326,7 @@ public class ExpressionTreeMaterializer {
 
     @Override
     public LogicalExpression visitConvertExpression(ConvertExpression e, FunctionImplementationRegistry value) {
-      String convertFunctionName = e.getConvertFunction() + e.getConversionType();
+      String convertFunctionName = e.getConvertFunction() + e.getEncodingType();
 
       List<LogicalExpression> newArgs = Lists.newArrayList();
       newArgs.add(e.getInput());  //input_expr


[14/14] git commit: DRILL-781: Use MapVector as the top level vector for HBase Column Families

Posted by ja...@apache.org.
DRILL-781: Use MapVector as the top level vector for HBase Column Families


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5d7e3d3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5d7e3d3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5d7e3d3a

Branch: refs/heads/master
Commit: 5d7e3d3ab548eb2b23607df46ea843a9c1532b72
Parents: e9e63c4
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Mon May 19 15:28:09 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 18:06:36 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseRecordReader.java     | 160 ++++++-------------
 .../exec/store/hbase/HBaseSchemaFactory.java    |   1 -
 .../org/apache/drill/hbase/HBaseTestsSuite.java |   1 -
 .../drill/hbase/TestHBaseFilterPushDown.java    |  16 +-
 .../drill/hbase/TestHBaseProjectPushDown.java   |   7 +-
 .../drill/exec/record/MaterializedField.java    |   3 +-
 6 files changed, 69 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index ae9f833..439f97f 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.hbase;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -40,6 +41,7 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
@@ -50,17 +52,17 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
 
   private static final int TARGET_RECORD_COUNT = 4000;
 
-  private List<SchemaPath> columns;
+  private LinkedHashSet<SchemaPath> columns;
   private OutputMutator outputMutator;
 
-  private Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap;
+  private Map<String, MapVector> familyVectorMap;
   private VarBinaryVector rowKeyVector;
   private SchemaPath rowKeySchemaPath;
 
@@ -78,34 +80,29 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
     hbaseTable = subScanSpec.getTableName();
     hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
     boolean rowKeyOnly = true;
+    this.columns = Sets.newLinkedHashSet();
     if (projectedColumns != null && projectedColumns.size() != 0) {
-      /*
-       * This will change once the non-scaler value vectors are available.
-       * Then, each column family will have a single top level value vector
-       * and each column will be an item vector in its corresponding TLV.
-       */
-      this.columns = Lists.newArrayList(projectedColumns);
-      Iterator<SchemaPath> columnIterator = columns.iterator();
+      Iterator<SchemaPath> columnIterator = projectedColumns.iterator();
       while(columnIterator.hasNext()) {
         SchemaPath column = columnIterator.next();
-        if (column.getRootSegment().getPath().toString().equalsIgnoreCase(ROW_KEY)) {
+        if (column.getRootSegment().getPath().equalsIgnoreCase(ROW_KEY)) {
           rowKeySchemaPath = ROW_KEY_PATH;
+          this.columns.add(rowKeySchemaPath);
           continue;
         }
         rowKeyOnly = false;
         NameSegment root = column.getRootSegment();
-        byte[] family = root.getPath().toString().getBytes();
+        byte[] family = root.getPath().getBytes();
+        this.columns.add(SchemaPath.getSimplePath(root.getPath()));
         PathSegment child = root.getChild();
         if (child != null && child.isNamed()) {
-          byte[] qualifier = child.getNameSegment().getPath().toString().getBytes();
+          byte[] qualifier = child.getNameSegment().getPath().getBytes();
           hbaseScan.addColumn(family, qualifier);
         } else {
-          columnIterator.remove();
           hbaseScan.addFamily(family);
         }
       }
     } else {
-      this.columns = Lists.newArrayList();
       rowKeyOnly = false;
       rowKeySchemaPath = ROW_KEY_PATH;
       this.columns.add(rowKeySchemaPath);
@@ -128,16 +125,16 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     this.outputMutator = output;
-    vvMap = new HashMap<FamilyQualifierWrapper, NullableVarBinaryVector>();
+    familyVectorMap = new HashMap<String, MapVector>();
 
     try {
       // Add Vectors to output in the order specified when creating reader
       for (SchemaPath column : columns) {
         if (column.equals(rowKeySchemaPath)) {
           MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.VARBINARY));
-          rowKeyVector = output.addField(field, VarBinaryVector.class);
-        } else if (column.getRootSegment().getChild() != null) {
-          getOrCreateColumnVector(new FamilyQualifierWrapper(column), false);
+          rowKeyVector = outputMutator.addField(field, VarBinaryVector.class);
+        } else {
+          getOrCreateFamilyVector(column.getRootSegment().getPath(), false);
         }
       }
       logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.",
@@ -158,12 +155,14 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
       rowKeyVector.clear();
       rowKeyVector.allocateNew();
     }
-    for (ValueVector v : vvMap.values()) {
+    for (ValueVector v : familyVectorMap.values()) {
       v.clear();
       v.allocateNew();
     }
 
-    for (int count = 0; count < TARGET_RECORD_COUNT; count++) {
+    int rowCount = 0;
+    done:
+    for (; rowCount < TARGET_RECORD_COUNT; rowCount++) {
       Result result = null;
       try {
         if (leftOver != null) {
@@ -176,54 +175,54 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
         throw new DrillRuntimeException(e);
       }
       if (result == null) {
-        setOutputValueCount(count);
-        logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), count);
-        return count;
+        break done;
       }
 
       // parse the result and populate the value vectors
       KeyValue[] kvs = result.raw();
       byte[] bytes = result.getBytes().get();
       if (rowKeyVector != null) {
-        if (!rowKeyVector.getMutator().setSafe(count, bytes, kvs[0].getRowOffset(), kvs[0].getRowLength())) {
-          setOutputValueCount(count);
+        if (!rowKeyVector.getMutator().setSafe(rowCount, bytes, kvs[0].getRowOffset(), kvs[0].getRowLength())) {
           leftOver = result;
-          logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), count);
-          return count;
+          break done;
         }
       }
+
       for (KeyValue kv : kvs) {
         int familyOffset = kv.getFamilyOffset();
         int familyLength = kv.getFamilyLength();
+        MapVector mv = getOrCreateFamilyVector(new String(bytes, familyOffset, familyLength), true);
+
         int qualifierOffset = kv.getQualifierOffset();
         int qualifierLength = kv.getQualifierLength();
+        NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(bytes, qualifierOffset, qualifierLength));
+
         int valueOffset = kv.getValueOffset();
         int valueLength = kv.getValueLength();
-        NullableVarBinaryVector v = getOrCreateColumnVector(
-            new FamilyQualifierWrapper(bytes, familyOffset, familyLength, qualifierOffset, qualifierLength), true);
-        if (!v.getMutator().setSafe(count, bytes, valueOffset, valueLength)) {
-          setOutputValueCount(count);
+        if (!v.getMutator().setSafe(rowCount, bytes, valueOffset, valueLength)) {
           leftOver = result;
-          logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), count);
-          return count;
+          return rowCount;
         }
       }
     }
-    setOutputValueCount(TARGET_RECORD_COUNT);
-    logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), TARGET_RECORD_COUNT);
-    return TARGET_RECORD_COUNT;
+
+    setOutputRowCount(rowCount);
+    logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
+    return rowCount;
   }
 
-  private NullableVarBinaryVector getOrCreateColumnVector(FamilyQualifierWrapper column, boolean allocateOnCreate) {
+  private MapVector getOrCreateFamilyVector(String familyName, boolean allocateOnCreate) {
     try {
-      NullableVarBinaryVector v = vvMap.get(column);
+      MapVector v = familyVectorMap.get(familyName);
       if(v == null) {
-        MaterializedField field = MaterializedField.create(column.asSchemaPath(), Types.optional(TypeProtos.MinorType.VARBINARY));
-        v = outputMutator.addField(field, NullableVarBinaryVector.class);
+        SchemaPath column = SchemaPath.getSimplePath(familyName);
+        MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.MAP));
+        v = outputMutator.addField(field, MapVector.class);
         if (allocateOnCreate) {
           v.allocateNew();
         }
-        vvMap.put(column, v);
+        columns.add(column);
+        familyVectorMap.put(familyName, v);
       }
       return v;
     } catch (SchemaChangeException e) {
@@ -231,6 +230,15 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
     }
   }
 
+  private NullableVarBinaryVector getOrCreateColumnVector(MapVector mv, String qualifier) {
+    int oldSize = mv.size();
+    NullableVarBinaryVector v = mv.addOrGet(qualifier, Types.optional(TypeProtos.MinorType.VARBINARY), NullableVarBinaryVector.class);
+    if (oldSize != mv.size()) {
+      v.allocateNew();
+    }
+    return v;
+  }
+
   @Override
   public void cleanup() {
     try {
@@ -245,8 +253,8 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
     }
   }
 
-  private void setOutputValueCount(int count) {
-    for (ValueVector vv : vvMap.values()) {
+  private void setOutputRowCount(int count) {
+    for (ValueVector vv : familyVectorMap.values()) {
       vv.getMutator().setValueCount(count);
     }
     if (rowKeyVector != null) {
@@ -254,68 +262,4 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
     }
   }
 
-  private static class FamilyQualifierWrapper implements Comparable<FamilyQualifierWrapper> {
-    int hashCode;
-    protected String stringVal;
-    protected String family;
-    protected String qualifier;
-
-    public FamilyQualifierWrapper(SchemaPath column) {
-      this(column.getRootSegment().getPath(), column.getRootSegment().getChild().getNameSegment().getPath());
-    }
-
-    public FamilyQualifierWrapper(byte[] bytes, int familyOffset, int familyLength, int qualifierOffset, int qualifierLength) {
-      this(new String(bytes, familyOffset, familyLength), new String(bytes, qualifierOffset, qualifierLength));
-    }
-
-    public FamilyQualifierWrapper(String family, String qualifier) {
-      this.family = family;
-      this.qualifier = qualifier;
-      hashCode = 31*family.hashCode() + qualifier.hashCode();
-    }
-
-    @Override
-    public int hashCode() {
-      return this.hashCode;
-    }
-
-    @Override
-    public boolean equals(Object anObject) {
-      if (this == anObject) {
-        return true;
-      }
-      if (anObject instanceof FamilyQualifierWrapper) {
-        FamilyQualifierWrapper that = (FamilyQualifierWrapper) anObject;
-        // we compare qualifier first since many columns will have same family
-        if (!qualifier.equals(that.qualifier)) {
-          return false;
-        }
-        return family.equals(that.family);
-      }
-      return false;
-    }
-
-    @Override
-    public String toString() {
-      if (stringVal == null) {
-        stringVal = new StringBuilder().append(new String(family)).append(".").append(new String(qualifier)).toString();
-      }
-      return stringVal;
-    }
-
-    public SchemaPath asSchemaPath() {
-      return SchemaPath.getCompoundPath(family, qualifier);
-    }
-
-    @Override
-    public int compareTo(FamilyQualifierWrapper o) {
-      int val = family.compareTo(o.family);
-      if (val != 0) {
-        return val;
-      }
-      return qualifier.compareTo(o.qualifier);
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index ce3b9fd..84f363b 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.SchemaFactory;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index 3881f4d..e30f79e 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 import org.junit.runners.Suite.SuiteClasses;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 76300b7..90404b7 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -17,11 +17,6 @@
  */
 package org.apache.drill.hbase;
 
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.Arrays;
-
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestHBaseFilterPushDown extends BaseHBaseTest {
@@ -49,6 +44,17 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
   }
 
   @Test
+  public void testFilterPushDownRowKeyBetween() throws Exception {
+    runSQLVerifyCount("SELECT\n"
+        + "  *\n"
+        + "FROM\n"
+        + "  hbase.`[TABLE_NAME]` tableName\n"
+        + "WHERE\n"
+        + "  row_key BETWEEN 'a2' AND 'b4'"
+        , 3);
+  }
+
+  @Test
   public void testFilterPushDownMultiColumns() throws Exception {
     runSQLVerifyCount("SELECT\n"
         + "  *\n"

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
index 88194d5..b66d2ed 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
@@ -33,7 +33,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
   @Test
   public void testColumnWith1RowPushDown() throws Exception{
     runSQLVerifyCount("SELECT\n"
-        + "f2['c7']\n"
+        + "f2['c7'] as `f[c7]`\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName"
         , 1);
@@ -43,7 +43,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
   public void testRowKeyAndColumnPushDown() throws Exception{
     setColumnWidth(9);
     runSQLVerifyCount("SELECT\n"
-        + "row_key, f['c1']*31 as `f['c1']*31`, f['c2'] as `f['c2']`, 5 as `5`, 'abc' as `'abc'`\n"
+        + "row_key, f['c1']*31 as `f[c1]*31`, f['c2'] as `f[c2]`, 5 as `5`, 'abc' as `'abc'`\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName"
         , 6);
@@ -51,8 +51,9 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
 
   @Test
   public void testColumnFamilyPushDown() throws Exception{
+    setColumnWidth(74);
     runSQLVerifyCount("SELECT\n"
-        + "f\n"
+        + "f, f2\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName"
         , 6);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index 439552f..3d749d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 
@@ -202,7 +203,7 @@ public class MaterializedField{
 
   @Override
   public String toString() {
-    return "MaterializedField [path=" + path + ", type=" + type + "]";
+    return "MaterializedField [path=" + path + ", type=" + Types.toString(type) + "]";
   }
 
   public String toExpr(){


[05/14] git commit: DRILL-770: Fix decimal math functions with constants

Posted by ja...@apache.org.
DRILL-770: Fix decimal math functions with constants


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/62c0b1ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/62c0b1ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/62c0b1ba

Branch: refs/heads/master
Commit: 62c0b1ba384fd2b3c7a9af12ed75341a4294f331
Parents: 5a78ff8
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 18 18:31:57 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Mon May 19 10:43:26 2014 -0700

----------------------------------------------------------------------
 .../codegen/templates/Decimal/DecimalFunctions.java  |  2 ++
 .../apache/drill/exec/resolver/TypeCastRules.java    |  6 ------
 .../apache/drill/jdbc/test/TestFunctionsQuery.java   | 15 +++++++++++++++
 3 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/62c0b1ba/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java b/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
index 8f14e83..cff122e 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
@@ -237,8 +237,10 @@ import org.apache.drill.exec.expr.annotations.Workspace;
 
             if (left.scale < right.scale) {
                 left.value = (${javaType}) (left.value * Math.pow(10, (right.scale - left.scale)));
+                left.scale = right.scale;
             } else if (right.scale < left.scale) {
                 right.value = (${javaType}) (right.value * Math.pow(10, (left.scale - right.scale)));
+                right.scale = left.scale;
             }
 </#macro>
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/62c0b1ba/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
index 515843d..2f6bf38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
@@ -420,12 +420,6 @@ public class TypeCastRules {
     rule.add(MinorType.UINT2);
     rule.add(MinorType.UINT4);
     rule.add(MinorType.UINT8);
-    rule.add(MinorType.DECIMAL9);
-    rule.add(MinorType.DECIMAL18);
-    rule.add(MinorType.DECIMAL28SPARSE);
-    rule.add(MinorType.DECIMAL28DENSE);
-    rule.add(MinorType.DECIMAL38SPARSE);
-    rule.add(MinorType.DECIMAL38DENSE);
     rule.add(MinorType.DATE);
     rule.add(MinorType.TIME);
     rule.add(MinorType.TIMESTAMPTZ);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/62c0b1ba/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
index 05884e5..66ae477 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
@@ -452,4 +452,19 @@ public class TestFunctionsQuery {
             "CNT=1.000\n" +
             "CNT=3.000\n");
   }
+
+  @Test
+  public void testDecimalAddIntConstant() throws Exception {
+    String query = "select 1 + cast(employee_id as decimal(9, 3)) as DEC_9 , 1 + cast(employee_id as decimal(38, 5)) as DEC_38 " +
+        "from cp.`employee.json` where employee_id <= 2";
+
+    JdbcAssert.withNoDefaultSchema()
+        .sql(query)
+        .returns(
+            "DEC_9=2.000; " +
+            "DEC_38=2.00000\n" +
+            "DEC_9=3.000; " +
+            "DEC_38=3.00000\n");
+  }
+
 }


[08/14] git commit: DRILL-776: Support SelectionVector SV2 and SV4 in Partition Sender.

Posted by ja...@apache.org.
DRILL-776: Support SelectionVector SV2 and SV4 in Partition Sender.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e9ac37db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e9ac37db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e9ac37db

Branch: refs/heads/master
Commit: e9ac37dbff8f0b606673bb5827c0daac1d3275b0
Parents: c40735e
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Mon May 19 08:35:34 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 17:46:19 2014 -0700

----------------------------------------------------------------------
 .../PartitionSenderRootExec.java                | 69 +++++++++++++++-----
 .../impl/partitionsender/Partitioner.java       |  5 ++
 .../partitionsender/PartitionerSV2Template.java | 60 +++++++++++++++++
 .../partitionsender/PartitionerSV4Template.java | 61 +++++++++++++++++
 .../physical/HashToRandomExchangePrel.java      |  5 ++
 .../org/apache/drill/TestExampleQueries.java    | 10 +++
 6 files changed, 193 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index bcd484c..f574351 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -182,8 +182,24 @@ public class PartitionSenderRootExec implements RootExec {
     // set up partitioning function
     final LogicalExpression expr = operator.getExpr();
     final ErrorCollector collector = new ErrorCollectorImpl();
-    final ClassGenerator<Partitioner> cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION,
-                                                                         context.getFunctionRegistry());
+    final ClassGenerator<Partitioner> cg ;
+
+    boolean hyper = false;
+
+    switch(incoming.getSchema().getSelectionVectorMode()){
+    case NONE:
+      cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+      break;
+    case TWO_BYTE:
+      cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION_SV2, context.getFunctionRegistry());
+      break;
+    case FOUR_BYTE:
+      cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION_SV4, context.getFunctionRegistry());
+      hyper = true;
+      break;
+    default:
+      throw new UnsupportedOperationException();
+    }
 
     final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, incoming, collector, context.getFunctionRegistry());
     if (collector.hasErrors()) {
@@ -255,22 +271,41 @@ public class PartitionSenderRootExec implements RootExec {
       Class<?> vvType = TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(),
                                                        vvIn.getField().getType().getMode());
       JClass vvClass = cg.getModel().ref(vvType);
-      // the following block generates calls to copyFrom(); e.g.:
-      // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
-      //                                                     outgoingBatches[bucket].getRecordCount(),
-      //                                                     vv1);
-      cg.getEvalBlock()._if(
-        ((JExpression) JExpr.cast(vvClass,
-              ((JExpression)
-                     outgoingVectors
-                       .component(bucket))
-                       .component(JExpr.lit(fieldId))))
-                       .invoke("copyFromSafe")
-                       .arg(inIndex)
-                       .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
-                       .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush"))
-                       ._return();
 
+      if (!hyper) {
+        // the following block generates calls to copyFrom(); e.g.:
+        // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
+        //                                                     outgoingBatches[bucket].getRecordCount(),
+        //                                                     vv1);
+        cg.getEvalBlock()._if(
+          ((JExpression) JExpr.cast(vvClass,
+                ((JExpression)
+                       outgoingVectors
+                         .component(bucket))
+                         .component(JExpr.lit(fieldId))))
+                         .invoke("copyFromSafe")
+                         .arg(inIndex)
+                         .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
+                         .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush"))
+                         ._return();
+      } else {
+        // the following block generates calls to copyFrom(); e.g.:
+        // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
+        //                                                     outgoingBatches[bucket].getRecordCount(),
+        //                                                     vv1[((inIndex)>>> 16)]);
+        cg.getEvalBlock()._if(
+          ((JExpression) JExpr.cast(vvClass,
+                ((JExpression)
+                       outgoingVectors
+                         .component(bucket))
+                         .component(JExpr.lit(fieldId))))
+                         .invoke("copyFromSafe")
+                         .arg(inIndex)
+                         .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
+                         .arg(incomingVV.component(inIndex.shrz(JExpr.lit(16)))).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush"))
+                         ._return();
+
+      }
       ++fieldId;
     }
     // generate the OutgoingRecordBatch helper invocations

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 7d3998b..3ffead0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -31,4 +31,9 @@ public interface Partitioner {
   public abstract void partitionBatch(RecordBatch incoming);
 
   public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
+
+  public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION_SV2 = new TemplateClassDefinition<>(Partitioner.class, PartitionerSV2Template.class);
+
+  public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION_SV4 = new TemplateClassDefinition<>(Partitioner.class, PartitionerSV4Template.class);
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
new file mode 100644
index 0000000..981055a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+public abstract class PartitionerSV2Template implements Partitioner {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerSV2Template.class);
+
+  private SelectionVector2 sv2;
+
+  public PartitionerSV2Template() throws SchemaChangeException {
+  }
+
+  @Override
+  public final void setup(FragmentContext context,
+                          RecordBatch incoming,
+                          OutgoingRecordBatch[] outgoing) throws SchemaChangeException {
+
+    this.sv2 = incoming.getSelectionVector2();
+
+    doSetup(context, incoming, outgoing);
+
+  }
+
+  @Override
+  public void partitionBatch(RecordBatch incoming) {
+
+    for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
+      // for each record
+      doEval(sv2.getIndex(recordId), 0);
+    }
+
+  }
+
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
+  public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
new file mode 100644
index 0000000..2e00f9b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public abstract class PartitionerSV4Template implements Partitioner {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerSV4Template.class);
+
+  private SelectionVector4 sv4;
+
+  public PartitionerSV4Template() throws SchemaChangeException {
+  }
+
+  @Override
+  public final void setup(FragmentContext context,
+                          RecordBatch incoming,
+                          OutgoingRecordBatch[] outgoing) throws SchemaChangeException {
+
+    this.sv4 = incoming.getSelectionVector4();
+
+    doSetup(context, incoming, outgoing);
+
+  }
+
+  @Override
+  public void partitionBatch(RecordBatch incoming) {
+
+    for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
+      // for each record
+      doEval(sv4.get(recordId), 0);
+    }
+
+  }
+
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
+  public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
index d582684..9756a76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
@@ -115,5 +115,10 @@ public class HashToRandomExchangePrel extends SinglePrel {
     return SelectionVectorMode.NONE;
   }
 
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.ALL;
+  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 83b43fb..1757290 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -49,6 +49,16 @@ public class TestExampleQueries extends BaseTestQuery{
   }
 
   @Test
+  public void testHashPartitionSV2 () throws Exception{
+    test("select count(n_nationkey) from cp.`tpch/nation.parquet` where n_nationkey > 8 group by n_regionkey");
+  }
+
+  @Test
+  public void testHashPartitionSV4 () throws Exception{
+    test("select count(n_nationkey) as cnt from cp.`tpch/nation.parquet` group by n_regionkey order by cnt");
+  }
+
+  @Test
   public void testSelectWithLimit() throws Exception{
     test("select employee_id,  first_name, last_name from cp.`employee.json` limit 5 ");
   }


[09/14] git commit: DRILL-768: Support 2 phase COUNT() aggregates.

Posted by ja...@apache.org.
DRILL-768: Support 2 phase COUNT() aggregates.

Create a new SUM aggregate function whose return type is non-nullable to match the return type of COUNT.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/78ae2658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/78ae2658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/78ae2658

Branch: refs/heads/master
Commit: 78ae26589745b0ed538a15644f6fd6897cb9e5ad
Parents: e9ac37d
Author: Aman Sinha <as...@maprtech.com>
Authored: Fri May 16 10:33:10 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 17:46:39 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/AggPrelBase.java      | 180 +++++++++++++++++++
 .../exec/planner/physical/AggPruleBase.java     |   2 +-
 .../exec/planner/physical/HashAggPrel.java      |  51 +-----
 .../exec/planner/physical/HashAggPrule.java     |  10 +-
 .../exec/planner/physical/StreamAggPrel.java    |  72 ++------
 .../exec/planner/physical/StreamAggPrule.java   |  21 ++-
 6 files changed, 221 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
new file mode 100644
index 0000000..c3b1188
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.util.BitSets;
+
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.AggregateRelBase;
+import org.eigenbase.rel.Aggregation;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.SqlAggFunction;
+import org.eigenbase.sql.SqlFunctionCategory;
+import org.eigenbase.sql.SqlKind;
+import org.eigenbase.sql.type.OperandTypes;
+import org.eigenbase.sql.type.ReturnTypes;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.ImmutableList;
+
+
+public abstract class AggPrelBase extends AggregateRelBase implements Prel{
+
+  protected static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
+
+  protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1 ; // default phase
+  protected List<NamedExpression> keys = Lists.newArrayList();
+  protected List<NamedExpression> aggExprs = Lists.newArrayList();
+  protected List<AggregateCall> phase2AggCallList = Lists.newArrayList();
+  
+  
+  /**
+   * Specialized aggregate function for SUMing the COUNTs.  Since return type of 
+   * COUNT is non-nullable and return type of SUM is nullable, this class enables 
+   * creating a SUM whose return type is non-nullable. 
+   *
+   */
+  public class SqlSumCountAggFunction extends SqlAggFunction {
+ 
+    private final RelDataType type;
+    
+    public SqlSumCountAggFunction(RelDataType type) {
+      super("SUM",
+          SqlKind.OTHER_FUNCTION,
+          ReturnTypes.BIGINT, // use the inferred return type of SqlCountAggFunction
+          null,
+          OperandTypes.NUMERIC,
+          SqlFunctionCategory.NUMERIC);
+      
+      this.type = type;
+    }
+ 
+    public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) {
+      return ImmutableList.of(type);
+    }
+
+    public RelDataType getType() {
+      return type;
+    }
+
+    public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
+      return type;
+    }
+    
+  }
+  
+  public AggPrelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
+      List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException {
+    super(cluster, traits, child, groupSet, aggCalls);
+    this.operPhase = phase;
+    createKeysAndExprs();
+  }
+  
+  public OperatorPhase getOperatorPhase() {
+    return operPhase;  
+  }
+  
+  public List<NamedExpression> getKeys() {
+    return keys;
+  }
+
+  public List<NamedExpression> getAggExprs() {
+    return aggExprs;  
+  }
+  
+  public List<AggregateCall> getPhase2AggCalls() {
+    return phase2AggCallList;  
+  }
+  
+  protected void createKeysAndExprs() {
+    final List<String> childFields = getChild().getRowType().getFieldNames();
+    final List<String> fields = getRowType().getFieldNames();
+
+    for (int group : BitSets.toIter(groupSet)) {
+      FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
+      keys.add(new NamedExpression(fr, fr));
+    }
+
+    for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
+      int aggExprOrdinal = groupSet.cardinality() + aggCall.i;
+      FieldReference ref = new FieldReference(fields.get(aggExprOrdinal));
+      LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
+      NamedExpression ne = new NamedExpression(expr, ref);
+      aggExprs.add(ne);
+      
+      if (getOperatorPhase() == OperatorPhase.PHASE_1of2) {
+        if (aggCall.e.getAggregation().getName().equals("COUNT")) {
+          // If we are doing a COUNT aggregate in Phase1of2, then in Phase2of2 we should SUM the COUNTs, 
+          Aggregation sumAggFun = new SqlSumCountAggFunction(aggCall.e.getType());
+          AggregateCall newAggCall = 
+              new AggregateCall(
+                  sumAggFun, 
+                  aggCall.e.isDistinct(), 
+                  Collections.singletonList(aggExprOrdinal), 
+                  aggCall.e.getType(),
+                  aggCall.e.getName());
+
+          phase2AggCallList.add(newAggCall); 
+        } else {
+          phase2AggCallList.add(aggCall.e);
+        }
+      }
+    }    
+  }
+  
+  protected LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
+    List<LogicalExpression> args = Lists.newArrayList();    
+    for(Integer i : call.getArgList()){
+      args.add(new FieldReference(fn.get(i)));
+    }
+
+    // for count(1).
+    if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
+    LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
+    return expr;
+  }
+  
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index 563458e..4edeaf8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -67,7 +67,7 @@ public abstract class AggPruleBase extends Prule {
 
     for (AggregateCall aggCall : aggregate.getAggCallList()) {
       String name = aggCall.getAggregation().getName();
-      if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX"))) {
+      if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX") || name.equals("COUNT"))) {
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index b2378be..31feb48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import net.hydromatic.linq4j.Ord;
 import net.hydromatic.optiq.util.BitSets;
@@ -49,18 +50,19 @@ import org.eigenbase.relopt.RelTraitSet;
 
 import com.beust.jcommander.internal.Lists;
 
-public class HashAggPrel extends AggregateRelBase implements Prel{
+public class HashAggPrel extends AggPrelBase implements Prel{
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggPrel.class);
 
   public HashAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
-      List<AggregateCall> aggCalls) throws InvalidRelException {
-    super(cluster, traits, child, groupSet, aggCalls);
+      List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException {
+    super(cluster, traits, child, groupSet, aggCalls, phase);
   }
 
   public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) {
     try {
-      return new HashAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls);
+      return new HashAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls, 
+          this.getOperatorPhase());
     } catch (InvalidRelException e) {
       throw new AssertionError(e);
     }
@@ -88,54 +90,16 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
 
-    final List<String> childFields = getChild().getRowType().getFieldNames();
-    final List<String> fields = getRowType().getFieldNames();
-    List<NamedExpression> keys = Lists.newArrayList();
-    List<NamedExpression> exprs = Lists.newArrayList();
-
-    for (int group : BitSets.toIter(groupSet)) {
-      FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
-      keys.add(new NamedExpression(fr, fr));
-    }
-
-    for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
-      FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
-      LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
-      exprs.add(new NamedExpression(expr, ref));
-    }
-
     Prel child = (Prel) this.getChild();
     HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator),
         keys.toArray(new NamedExpression[keys.size()]),
-        exprs.toArray(new NamedExpression[exprs.size()]),
+        aggExprs.toArray(new NamedExpression[aggExprs.size()]),
         1.0f);
 
     return g;
 
   }
 
-  private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
-    List<LogicalExpression> args = Lists.newArrayList();
-    for(Integer i : call.getArgList()){
-      args.add(new FieldReference(fn.get(i)));
-    }
-
-    // for count(1).
-    if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
-    LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
-    return expr;
-  }
-
-  @Override
-  public Iterator<Prel> iterator() {
-    return PrelUtil.iter(getChild());
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
-    return logicalVisitor.visitPrel(this, value);
-  }
-
   @Override
   public SelectionVectorMode[] getSupportedEncodings() {
     return SelectionVectorMode.DEFAULT;
@@ -146,5 +110,4 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
     return SelectionVectorMode.NONE;
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 9395a1d..95c8362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -22,6 +22,7 @@ import java.util.logging.Logger;
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
 import org.eigenbase.rel.InvalidRelException;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRule;
@@ -95,7 +96,8 @@ public class HashAggPrule extends AggPruleBase {
 
                 HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
                     aggregate.getGroupSet(),
-                    aggregate.getAggCallList());
+                    aggregate.getAggCallList(), 
+                    OperatorPhase.PHASE_1of2);
 
                 HashToRandomExchangePrel exch =
                     new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
@@ -103,7 +105,9 @@ public class HashAggPrule extends AggPruleBase {
 
                 HashAggPrel phase2Agg =  new HashAggPrel(aggregate.getCluster(), traits, exch,
                                                          aggregate.getGroupSet(),
-                                                         aggregate.getAggCallList());
+                                                         phase1Agg.getPhase2AggCalls(), 
+                                                         OperatorPhase.PHASE_2of2); 
+                                                    
 
                 call.transformTo(phase2Agg);
               }
@@ -122,7 +126,7 @@ public class HashAggPrule extends AggPruleBase {
     final RelNode convertedInput = convert(input, PrelUtil.fixTraits(call, traits));
 
     HashAggPrel newAgg = new HashAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
-                                         aggregate.getAggCallList());
+                                         aggregate.getAggCallList(), OperatorPhase.PHASE_1of1);
 
     call.transformTo(newAgg);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
index 5fb758a..9706254 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
@@ -19,24 +19,13 @@ package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
 import java.util.BitSet;
-import java.util.Iterator;
 import java.util.List;
 
-import net.hydromatic.linq4j.Ord;
-import net.hydromatic.optiq.util.BitSets;
-
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.SingleMergeExchange;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.eigenbase.rel.AggregateCall;
 import org.eigenbase.rel.AggregateRelBase;
@@ -48,30 +37,27 @@ import org.eigenbase.relopt.RelOptCost;
 import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelTraitSet;
 
-import com.beust.jcommander.internal.Lists;
-
-public class StreamAggPrel extends AggregateRelBase implements Prel{
+public class StreamAggPrel extends AggPrelBase implements Prel{
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamAggPrel.class);
 
+
+  
   public StreamAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
-      List<AggregateCall> aggCalls) throws InvalidRelException {
-    super(cluster, traits, child, groupSet, aggCalls);
-    for (AggregateCall aggCall : aggCalls) {
-      if (aggCall.isDistinct()) {
-        throw new InvalidRelException("DrillAggregateRel does not support DISTINCT aggregates");
-      }
-    }
+      List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException {
+    super(cluster, traits, child, groupSet, aggCalls, phase);
   }
 
   public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) {
     try {
-      return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls);
+      return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls, 
+          this.getOperatorPhase());
     } catch (InvalidRelException e) {
       throw new AssertionError(e);
     }
   }
 
+
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner) {
     if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
@@ -91,51 +77,15 @@ public class StreamAggPrel extends AggregateRelBase implements Prel{
 
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
-    final List<String> childFields = getChild().getRowType().getFieldNames();
-    final List<String> fields = getRowType().getFieldNames();
-    List<NamedExpression> keys = Lists.newArrayList();
-    List<NamedExpression> exprs = Lists.newArrayList();
-
-    for (int group : BitSets.toIter(groupSet)) {
-      FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
-      keys.add(new NamedExpression(fr, fr));
-    }
-
-    for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
-      FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
-      LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
-      exprs.add(new NamedExpression(expr, ref));
-    }
 
     Prel child = (Prel) this.getChild();
-    StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), exprs.toArray(new NamedExpression[exprs.size()]), 1.0f);
+    StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), 
+        aggExprs.toArray(new NamedExpression[aggExprs.size()]), 1.0f);
 
     return g;
 
   }
-
-  private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
-    List<LogicalExpression> args = Lists.newArrayList();
-    for(Integer i : call.getArgList()){
-      args.add(new FieldReference(fn.get(i)));
-    }
-
-    // for count(1).
-    if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
-    LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
-    return expr;
-  }
-
-  @Override
-  public Iterator<Prel> iterator() {
-    return PrelUtil.iter(getChild());
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
-    return logicalVisitor.visitPrel(this, value);
-  }
-
+  
   @Override
   public SelectionVectorMode[] getSupportedEncodings() {
     return SelectionVectorMode.ALL;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index ff648a4..9a60a14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -25,6 +25,7 @@ import net.hydromatic.optiq.util.BitSets;
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import org.eigenbase.rel.AggregateCall;
 import org.eigenbase.rel.InvalidRelException;
@@ -59,8 +60,12 @@ public class StreamAggPrule extends AggPruleBase {
     final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
     final RelNode input = aggregate.getChild();
     RelCollation collation = getCollation(aggregate);
-
     RelTraitSet traits = null;
+    
+    if (aggregate.containsDistinctCall()) {
+      // currently, don't use StreamingAggregate if any of the logical aggrs contains DISTINCT
+      return;
+    }
 
     try {
       if (aggregate.getGroupSet().isEmpty()) {
@@ -82,14 +87,16 @@ public class StreamAggPrule extends AggPruleBase {
 
                 StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
                     aggregate.getGroupSet(),
-                    aggregate.getAggCallList());
+                    aggregate.getAggCallList(), 
+                    OperatorPhase.PHASE_1of2);
 
                 UnionExchangePrel exch = 
                     new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg);
         
                 StreamAggPrel phase2Agg =  new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch,
                     aggregate.getGroupSet(),
-                    aggregate.getAggCallList());
+                    phase1Agg.getPhase2AggCalls(),
+                    OperatorPhase.PHASE_2of2);
 
                 call.transformTo(phase2Agg);  
               }
@@ -135,7 +142,8 @@ public class StreamAggPrule extends AggPruleBase {
 
                 StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
                     aggregate.getGroupSet(),
-                    aggregate.getAggCallList());
+                    aggregate.getAggCallList(), 
+                    OperatorPhase.PHASE_1of2);
 
                 int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
 
@@ -147,7 +155,8 @@ public class StreamAggPrule extends AggPruleBase {
 
                 StreamAggPrel phase2Agg =  new StreamAggPrel(aggregate.getCluster(), traits, exch,
                     aggregate.getGroupSet(),
-                    aggregate.getAggCallList());
+                    phase1Agg.getPhase2AggCalls(), 
+                    OperatorPhase.PHASE_2of2);
 
                 call.transformTo(phase2Agg);                   
               }
@@ -166,7 +175,7 @@ public class StreamAggPrule extends AggPruleBase {
     final RelNode convertedInput = convert(input, traits);
     
     StreamAggPrel newAgg = new StreamAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
-                                             aggregate.getAggCallList());
+                                             aggregate.getAggCallList(), OperatorPhase.PHASE_1of1);
       
     call.transformTo(newAgg);
   }


[03/14] git commit: DRILL-757: Output mutator interface changes - Output mutator manages schema changes instead of record readers - Removed usages of deprecated interface

Posted by ja...@apache.org.
DRILL-757: Output mutator interface changes
- Output mutator manages schema changes instead of record readers
- Removed usages of deprecated interface


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/828a5c69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/828a5c69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/828a5c69

Branch: refs/heads/master
Commit: 828a5c69501fd94a64359c6e3abd474f138dc92f
Parents: cb0d46f
Author: Mehant Baid <me...@gmail.com>
Authored: Thu May 15 18:03:28 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Sun May 18 02:34:31 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseRecordReader.java     |  4 -
 .../drill/exec/physical/impl/OutputMutator.java | 14 ++-
 .../drill/exec/physical/impl/ScanBatch.java     | 99 +++++++++++---------
 .../exec/store/easy/json/JSONRecordReader2.java |  3 +-
 .../drill/exec/store/hive/HiveRecordReader.java |  7 +-
 .../exec/store/ischema/RowRecordReader.java     | 17 +---
 .../drill/exec/store/mock/MockRecordReader.java | 18 ++--
 .../exec/store/parquet/ParquetRecordReader.java | 11 ---
 .../drill/exec/store/pojo/PojoRecordReader.java |  2 -
 .../exec/store/text/DrillTextRecordReader.java  |  6 +-
 .../drill/exec/store/TestOutputMutator.java     | 28 +++---
 .../exec/store/ischema/TestOrphanSchema.java    | 19 +++-
 .../exec/store/ischema/TestTableProvider.java   | 22 +++--
 .../store/parquet/ParquetRecordReaderTest.java  | 29 +++---
 14 files changed, 136 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index dbf8123..ae9f833 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -128,7 +128,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     this.outputMutator = output;
-    output.removeAllFields();
     vvMap = new HashMap<FamilyQualifierWrapper, NullableVarBinaryVector>();
 
     try {
@@ -141,8 +140,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
           getOrCreateColumnVector(new FamilyQualifierWrapper(column), false);
         }
       }
-      output.setNewSchema();
-
       logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.",
           hbaseTable, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM),
           hbaseConf.get(HBASE_ZOOKEEPER_PORT), hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
@@ -227,7 +224,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
           v.allocateNew();
         }
         vvMap.put(column, v);
-        outputMutator.setNewSchema();
       }
       return v;
     } catch (SchemaChangeException e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index 38d56ea..1aec625 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -22,12 +22,16 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.ValueVector;
 
+import java.util.List;
+
 public interface OutputMutator {
-  public void removeField(MaterializedField field) throws SchemaChangeException;
   public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException ;
+  public void allocate(int recordCount);
+  public boolean isNewSchema();
 
-  @Deprecated
-  public void addField(ValueVector vector) throws SchemaChangeException ;
-  public void removeAllFields();
-  public void setNewSchema() throws SchemaChangeException;
+  /* TODO: This interface is added to support information schema tables,
+   * FixedTables, the way they exist currently.
+   * One to many layers to rip out, address it as a separate JIRA.
+   */
+  public void addFields(List<ValueVector> vvList);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index a49d1a8..c0810c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -61,10 +61,11 @@ public class ScanBatch implements RecordBatch {
   private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
 
   final Map<MaterializedField, ValueVector> fieldVectorMap = Maps.newHashMap();
+  final Map<MaterializedField, Class<?>> fieldVectorClassMap = Maps.newHashMap();
+  private static final int MAX_RECORD_CNT = Character.MAX_VALUE;
 
   private final VectorContainer container = new VectorContainer();
   private int recordCount;
-  private boolean schemaChanged = true;
   private final FragmentContext context;
   private final OperatorContext oContext;
   private Iterator<RecordReader> readers;
@@ -123,6 +124,7 @@ public class ScanBatch implements RecordBatch {
 
   @Override
   public IterOutcome next() {
+    mutator.allocate(MAX_RECORD_CNT);
     while ((recordCount = currentReader.next()) == 0) {
       try {
         if (!readers.hasNext()) {
@@ -133,8 +135,8 @@ public class ScanBatch implements RecordBatch {
         currentReader.cleanup();
         currentReader = readers.next();
         partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
-        mutator.removeAllFields();
         currentReader.setup(mutator);
+        mutator.allocate(MAX_RECORD_CNT);
         addPartitionVectors();
       } catch (ExecutionSetupException e) {
         this.context.fail(e);
@@ -144,28 +146,32 @@ public class ScanBatch implements RecordBatch {
     }
 
     populatePartitionVectors();
-    if (schemaChanged) {
-      schemaChanged = false;
+    if (mutator.isNewSchema()) {
+      container.buildSchema(SelectionVectorMode.NONE);
+      schema = container.getSchema();
       return IterOutcome.OK_NEW_SCHEMA;
     } else {
       return IterOutcome.OK;
     }
   }
 
-  private void addPartitionVectors() {
-    partitionVectors = Lists.newArrayList();
-    for (int i : selectedPartitionColumns) {
-      MaterializedField field;
-      ValueVector v;
-      if (partitionValues.length > i) {
-        field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.required(MinorType.VARCHAR));
-        v = new VarCharVector(field, context.getAllocator());
-      } else {
-        field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR));
-        v = new NullableVarCharVector(field, context.getAllocator());
+  private void addPartitionVectors() throws ExecutionSetupException{
+    try {
+      partitionVectors = Lists.newArrayList();
+      for (int i : selectedPartitionColumns) {
+        MaterializedField field;
+        ValueVector v;
+        if (partitionValues.length > i) {
+          field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.required(MinorType.VARCHAR));
+          v = mutator.addField(field, VarCharVector.class);
+        } else {
+          field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR));
+          v = mutator.addField(field, NullableVarCharVector.class);
+        }
+        partitionVectors.add(v);
       }
-      mutator.addField(v);
-      partitionVectors.add(v);
+    } catch(SchemaChangeException e) {
+      throw new ExecutionSetupException(e);
     }
   }
 
@@ -212,43 +218,52 @@ public class ScanBatch implements RecordBatch {
 
   private class Mutator implements OutputMutator {
 
-    public void removeField(MaterializedField field) throws SchemaChangeException {
-      ValueVector vector = fieldVectorMap.remove(field);
-      if (vector == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
-      container.remove(vector);
-      vector.close();
-    }
+    boolean schemaChange = true;
 
-    public void addField(ValueVector vector) {
-      container.add(vector);
-      fieldVectorMap.put(vector.getField(), vector);
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
+      // Check if the field exists
+      ValueVector v = fieldVectorMap.get(field);
+
+      if (v == null || v.getClass() != clazz) {
+        // Field does not exist add it to the map and the output container
+        v = TypeHelper.getNewVector(field, oContext.getAllocator());
+        if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+        container.add(v);
+        fieldVectorMap.put(field, v);
+
+        // Adding new vectors to the container mark that the schema has changed
+        schemaChange = true;
+      }
+
+      return (T) v;
     }
 
     @Override
-    public void removeAllFields() {
-      for(VectorWrapper<?> vw : container){
-        vw.clear();
+    public void addFields(List<ValueVector> vvList) {
+      for (ValueVector v : vvList) {
+        fieldVectorMap.put(v.getField(), v);
+        container.add(v);
       }
-      container.clear();
-      fieldVectorMap.clear();
+      schemaChange = true;
     }
 
     @Override
-    public void setNewSchema() throws SchemaChangeException {
-      container.buildSchema(SelectionVectorMode.NONE);
-      schema = container.getSchema();
-      ScanBatch.this.schemaChanged = true;
+    public void allocate(int recordCount) {
+      for (ValueVector v : fieldVectorMap.values()) {
+        AllocationHelper.allocate(v, recordCount, 50, 10);
+      }
     }
 
-    @SuppressWarnings("unchecked")
     @Override
-    public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
-      ValueVector v = TypeHelper.getNewVector(field, oContext.getAllocator());
-      if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
-      addField(v);
-      return (T) v;
+    public boolean isNewSchema() {
+      if (schemaChange == true) {
+        schemaChange = false;
+        return true;
+      }
+      return false;
     }
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
index bb52a20..37624d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -95,10 +95,9 @@ public class JSONRecordReader2 implements RecordReader{
 
 
       writer.setValueCount(i);
-      mutator.setNewSchema();
       return i;
 
-    }catch(IOException | SchemaChangeException e){
+    }catch(IOException e){
       throw new DrillRuntimeException("Failure while reading JSON file.", e);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 9e01268..4361262 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -218,7 +218,6 @@ public class HiveRecordReader implements RecordReader {
 
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
-    output.removeAllFields();
     try {
       for (int i = 0; i < columnNames.size(); i++) {
         PrimitiveCategory pCat = primitiveCategories.get(i);
@@ -230,11 +229,9 @@ public class HiveRecordReader implements RecordReader {
       for (int i = 0; i < selectedPartitionNames.size(); i++) {
         String type = selectedPartitionTypes.get(i);
         MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(columnNames.get(i)), Types.getMajorTypeFromName(type));
-        ValueVector vv = TypeHelper.getNewVector(field, context.getAllocator());
-        pVectors.add(vv);
-        output.addField(vv);
+        Class vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
+        pVectors.add(output.addField(field, vvClass));
       }
-      output.setNewSchema();
     } catch(SchemaChangeException e) {
       throw new ExecutionSetupException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
index ac601d4..c578b5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
@@ -29,8 +29,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.ValueVector;
-
-
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 
 /**
@@ -80,15 +79,8 @@ public class RowRecordReader implements RecordReader {
     
     // Inform drill of the output columns. They were set up when the vector handler was created.
     //  Note we are currently working with fixed tables.
-    try {
-      for (ValueVector v: batch.getValueVectors()) {
-        output.addField(v);;
-      }
-      output.setNewSchema();
-    } catch (SchemaChangeException e) {
-      throw new ExecutionSetupException("Failure while setting up fields", e);
-    }
-    
+    output.addFields(batch.getValueVectors());
+
     // Estimate the number of records we can hold in a RecordBatch
     maxRowCount = batch.getEstimatedRowCount(bufSize);
   }
@@ -101,9 +93,6 @@ public class RowRecordReader implements RecordReader {
   @Override
   public int next() {
     
-    // Make note are are starting a new batch of records
-    batch.beginBatch(maxRowCount);
-    
     // Repeat until out of data or vectors are full
     int actualCount;
     for (actualCount = 0; actualCount < maxRowCount && provider.hasNext(); actualCount++) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 5c07dc5..c7fc939 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.mock;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.SchemaPath;
@@ -34,6 +35,8 @@ import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
+import java.util.List;
+
 public class MockRecordReader implements RecordReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
 
@@ -76,9 +79,10 @@ public class MockRecordReader implements RecordReader {
 
       for (int i = 0; i < config.getTypes().length; i++) {
         MajorType type = config.getTypes()[i].getMajorType();
-        valueVectors[i] = output.addField(getVector(config.getTypes()[i].getName(), type, batchRecordCount), (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+        MaterializedField field = getVector(config.getTypes()[i].getName(), type, batchRecordCount);
+        Class vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
+        valueVectors[i] = output.addField(field, vvClass);
       }
-      output.setNewSchema();
     } catch (SchemaChangeException e) {
       throw new ExecutionSetupException("Failure while setting up fields", e);
     }
@@ -93,7 +97,6 @@ public class MockRecordReader implements RecordReader {
 
     recordsRead += recordSetSize;
     for(ValueVector v : valueVectors){
-      AllocationHelper.allocate(v, recordSetSize, 50, 10);
 
 //      logger.debug(String.format("MockRecordReader:  Generating %d records of random data for VV of type %s.", recordSetSize, v.getClass().getName()));
       ValueVector.Mutator m = v.getMutator();
@@ -105,14 +108,5 @@ public class MockRecordReader implements RecordReader {
 
   @Override
   public void cleanup() {
-    for (int i = 0; i < valueVectors.length; i++) {
-      try {
-        output.removeField(valueVectors[i].getField());
-      } catch (SchemaChangeException e) {
-        logger.warn("Failure while trying to remove field.", e);
-      }
-      valueVectors[i].close();
-    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 4ca13a5..9cdd205 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -274,14 +274,6 @@ public class ParquetRecordReader implements RecordReader {
     } catch (SchemaChangeException e) {
       throw new ExecutionSetupException(e);
     }
-
-//    output.removeAllFields();
-    try {
-      output.setNewSchema();
-    }catch(SchemaChangeException e) {
-      throw new ExecutionSetupException("Error setting up output mutator.", e);
-    }
-
   }
 
   private SchemaPath toFieldName(String[] paths) {
@@ -298,15 +290,12 @@ public class ParquetRecordReader implements RecordReader {
 
   private void resetBatch() {
     for (ColumnReader column : columnStatuses) {
-      AllocationHelper.allocate(column.valueVec, recordsPerBatch, 10, 5);
       column.valuesReadInCurrentPass = 0;
     }
     for (VarLengthColumn r : varLengthReader.columns){
-      AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5);
       r.valuesReadInCurrentPass = 0;
     }
     for (NullableVarLengthColumn r : varLengthReader.nullableColumns){
-      AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5);
       r.valuesReadInCurrentPass = 0;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index fc5a9b4..1ebd1f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -89,8 +89,6 @@ public class PojoRecordReader<T> implements RecordReader{
         }
         writers[i].init(output);
       }
-
-      output.setNewSchema();
     }catch(SchemaChangeException e){
       throw new ExecutionSetupException("Failure while setting up schema for PojoRecordReader.", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index ef05b4c..5c3d381 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
@@ -95,12 +96,9 @@ public class DrillTextRecordReader implements RecordReader {
 
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
-    output.removeAllFields();
     MaterializedField field = MaterializedField.create(ref, Types.repeated(TypeProtos.MinorType.VARCHAR));
-    vector = new RepeatedVarCharVector(field, context.getAllocator());
     try {
-      output.addField(vector);
-      output.setNewSchema();
+      vector = output.addField(field, RepeatedVarCharVector.class);
     } catch (SchemaChangeException e) {
       throw new ExecutionSetupException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
index 37369da..0161632 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -56,18 +57,8 @@ public class TestOutputMutator implements OutputMutator, Iterable<VectorWrapper<
     fieldVectorMap.put(vector.getField(), vector);
   }
 
-  @Override
-  public void removeAllFields() {
-    for (VectorWrapper<?> vw : container) {
-      vw.clear();
-    }
-    container.clear();
-    fieldVectorMap.clear();
-  }
-
-  @Override
-  public void setNewSchema() throws SchemaChangeException {
-    container.buildSchema(SelectionVectorMode.NONE);
+  public void addFields(List<ValueVector> v) {
+    return;
   }
 
   public Iterator<VectorWrapper<?>> iterator() {
@@ -75,7 +66,17 @@ public class TestOutputMutator implements OutputMutator, Iterable<VectorWrapper<
   }
 
   public void clear(){
-    removeAllFields();
+
+  }
+
+  @Override
+  public boolean isNewSchema() {
+    return false;
+  }
+
+  @Override
+  public void allocate(int recordCount) {
+    return;
   }
 
   @Override
@@ -85,5 +86,4 @@ public class TestOutputMutator implements OutputMutator, Iterable<VectorWrapper<
     addField(v);
     return (T) v;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
index 3b8b57b..8eadb56 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
@@ -27,6 +27,7 @@ import net.hydromatic.optiq.SchemaPlus;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.memory.TopLevelAllocator;
@@ -138,18 +139,28 @@ public class TestOrphanSchema extends ExecTest {
       vectors.add(vector);
     }
 
+    public void addFields(List<ValueVector> v) {
+      return;
+    }
+
     public Object get(int column, int row) {
       return vectors.get(column).getAccessor().getObject(row);
     }
 
-    public void removeField(MaterializedField field) {}
-    public void removeAllFields() {}
-    public void setNewSchema() {}
-
     @Override
     public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
       return null;
     }
+
+    @Override
+    public void allocate(int recordCount) {
+      return;
+    }
+
+    @Override
+    public boolean isNewSchema() {
+      return false;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
index 8da1ea4..217b792 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
@@ -143,22 +143,28 @@ public class TestTableProvider extends ExecTest {
   static class TestOutput implements OutputMutator {
     List<ValueVector> vectors = new ArrayList<ValueVector>();
 
-    public void addField(ValueVector vector) throws SchemaChangeException {
-      vectors.add(vector);
-    }
-
     public Object get(int column, int row) {
       return vectors.get(column).getAccessor().getObject(row);
     }
 
-    public void removeField(MaterializedField field) {}
-    public void removeAllFields() {}
-    public void setNewSchema() {}
-
     @Override
     public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
       return null;
     }
+
+    @Override
+    public void addFields(List<ValueVector> vv) {
+      return;
+    }
+
+    @Override
+    public void allocate(int recordCount) {
+      
+    }
+    @Override
+    public boolean isNewSchema() {
+      return false;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 5d2c859..e594441 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -196,36 +196,33 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
     List<MaterializedField> removedFields = Lists.newArrayList();
     List<ValueVector> addFields = Lists.newArrayList();
 
-    @Override
-    public void removeField(MaterializedField field) throws SchemaChangeException {
-      removedFields.add(field);
+
+    List<MaterializedField> getRemovedFields() {
+      return removedFields;
     }
 
-    @Override
-    public void addField(ValueVector vector) throws SchemaChangeException {
-      addFields.add(vector);
+    List<ValueVector> getAddFields() {
+      return addFields;
     }
 
     @Override
-    public void removeAllFields() {
-      addFields.clear();
+    public void addFields(List<ValueVector> vv) {
+      return;
     }
 
     @Override
-    public void setNewSchema() throws SchemaChangeException {
+    public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
+      return null;
     }
 
-    List<MaterializedField> getRemovedFields() {
-      return removedFields;
-    }
+    @Override
+    public void allocate(int recordCount) {
 
-    List<ValueVector> getAddFields() {
-      return addFields;
     }
 
     @Override
-    public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
-      return null;
+    public boolean isNewSchema() {
+      return false;
     }
   }
 


[07/14] git commit: DRILL-588: Ignore leading zeroes while determining if digits will fit in a given precision

Posted by ja...@apache.org.
DRILL-588: Ignore leading zeroes while determining if digits will fit in a given precision


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c40735ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c40735ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c40735ed

Branch: refs/heads/master
Commit: c40735ed0e582aba66d27b00a929bae173e8a6a9
Parents: 492ec59
Author: Mehant Baid <me...@gmail.com>
Authored: Mon May 19 11:32:59 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Mon May 19 11:32:59 2014 -0700

----------------------------------------------------------------------
 .../templates/Decimal/CastVarCharDecimal.java    | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c40735ed/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java b/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
index e3eb973..8441298 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
@@ -83,6 +83,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
          */
         int integerStartIndex = readIndex;
         int integerEndIndex = endIndex;
+        boolean leadingDigitFound = false;
 
         int radix = 10;
 
@@ -108,6 +109,13 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
                 byte[] buf = new byte[in.end - in.start];
                 in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
                 throw new org.apache.drill.common.exceptions.DrillRuntimeException(new String(buf, com.google.common.base.Charsets.UTF_8));
+            } else if (leadingDigitFound == false) {
+                if (next == 0) {
+                    // Ignore the leading zeroes while validating if input digits will fit within the given precision
+                    integerStartIndex++;
+                } else {
+                    leadingDigitFound = true;
+                }
             }
             out.value *= radix;
             out.value += next;
@@ -215,7 +223,8 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
         startIndex = readIndex;
 
         int radix = 10;
-
+        boolean leadingDigitFound = false;
+    
         /* This is the first pass, we get the number of integer digits and based on the provided scale
          * we compute which index into the ByteBuf we start storing the integer part of the Decimal
          */
@@ -243,7 +252,13 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
                     throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
                 }
 
-                integerDigits++;
+                if (leadingDigitFound == false && next != 0) {
+                    leadingDigitFound = true;
+                }
+
+                if (leadingDigitFound == true) {
+                    integerDigits++;
+                }
             }
         }
 


[02/14] git commit: Fix alignment in Hash Join code

Posted by ja...@apache.org.
Fix alignment in Hash Join code


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cb0d46f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cb0d46f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cb0d46f6

Branch: refs/heads/master
Commit: cb0d46f69df2d89601f56b257f3f6c1d97eedc6e
Parents: 1f4276e
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 18 01:52:03 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Sun May 18 01:52:03 2014 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/join/HashJoinBatch.java  | 634 +++++++++----------
 .../impl/join/HashJoinBatchCreator.java         |  10 +-
 .../exec/physical/impl/join/HashJoinHelper.java | 278 ++++----
 .../exec/physical/impl/join/HashJoinProbe.java  |  38 +-
 .../impl/join/HashJoinProbeTemplate.java        | 310 ++++-----
 5 files changed, 635 insertions(+), 635 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 5eec3bb..9afc033 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -59,397 +59,397 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
   public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
   public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
 
-    // Probe side record batch
-    private final RecordBatch left;
+  // Probe side record batch
+  private final RecordBatch left;
 
-    // Build side record batch
-    private final RecordBatch right;
+  // Build side record batch
+  private final RecordBatch right;
 
-    // Join type, INNER, LEFT, RIGHT or OUTER
-    private final JoinRelType joinType;
+  // Join type, INNER, LEFT, RIGHT or OUTER
+  private final JoinRelType joinType;
 
-    // Join conditions
-    private final List<JoinCondition> conditions;
+  // Join conditions
+  private final List<JoinCondition> conditions;
 
-    // Runtime generated class implementing HashJoinProbe interface
-    private HashJoinProbe hashJoinProbe = null;
+  // Runtime generated class implementing HashJoinProbe interface
+  private HashJoinProbe hashJoinProbe = null;
 
-    /* Helper class
-     * Maintains linked list of build side records with the same key
-     * Keeps information about which build records have a corresponding
-     * matching key in the probe side (for outer, right joins)
-     */
-    private HashJoinHelper hjHelper = null;
+  /* Helper class
+   * Maintains linked list of build side records with the same key
+   * Keeps information about which build records have a corresponding
+   * matching key in the probe side (for outer, right joins)
+   */
+  private HashJoinHelper hjHelper = null;
 
-    // Underlying hashtable used by the hash join
-    private HashTable hashTable = null;
+  // Underlying hashtable used by the hash join
+  private HashTable hashTable = null;
 
-    /* Hyper container to store all build side record batches.
-     * Records are retrieved from this container when there is a matching record
-     * on the probe side
-     */
-    private ExpandableHyperContainer hyperContainer;
+  /* Hyper container to store all build side record batches.
+   * Records are retrieved from this container when there is a matching record
+   * on the probe side
+   */
+  private ExpandableHyperContainer hyperContainer;
 
-    // Number of records in the output container
-    private int outputRecords;
+  // Number of records in the output container
+  private int outputRecords;
 
-    // Current batch index on the build side
-    private int buildBatchIndex = 0;
+  // Current batch index on the build side
+  private int buildBatchIndex = 0;
 
-    // List of vector allocators
-    private List<VectorAllocator> allocators = null;
+  // List of vector allocators
+  private List<VectorAllocator> allocators = null;
 
-    // Schema of the build side
-    private BatchSchema rightSchema = null;
+  // Schema of the build side
+  private BatchSchema rightSchema = null;
 
-    // Generator mapping for the build side
-    private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
-                                                                                  "projectBuildRecord" /* eval method */,
-                                                                                  null /* reset */, null /* cleanup */);
+  // Generator mapping for the build side
+  private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
+                                                                                "projectBuildRecord" /* eval method */,
+                                                                                null /* reset */, null /* cleanup */);
 
-    // Generator mapping for the probe side
-    private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
-                                                                                  "projectProbeRecord" /* eval method */,
-                                                                                  null /* reset */, null /* cleanup */);
+  // Generator mapping for the probe side
+  private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
+                                                                                "projectProbeRecord" /* eval method */,
+                                                                                null /* reset */, null /* cleanup */);
 
-    // Mapping set for the build side
-    private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
-                                                                  "buildBatch" /* read container */,
-                                                                  "outgoing" /* write container */,
-                                                                  PROJECT_BUILD, PROJECT_BUILD);
+  // Mapping set for the build side
+  private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
+                                                                "buildBatch" /* read container */,
+                                                                "outgoing" /* write container */,
+                                                                PROJECT_BUILD, PROJECT_BUILD);
 
-    // Mapping set for the probe side
-    private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
-                                                                  "probeBatch" /* read container */,
-                                                                  "outgoing" /* write container */,
-                                                                  PROJECT_PROBE, PROJECT_PROBE);
+  // Mapping set for the probe side
+  private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
+                                                                "probeBatch" /* read container */,
+                                                                "outgoing" /* write container */,
+                                                                PROJECT_PROBE, PROJECT_PROBE);
 
-    // indicates if we have previously returned an output batch
-    boolean firstOutputBatch = true;
+  // indicates if we have previously returned an output batch
+  boolean firstOutputBatch = true;
 
-    IterOutcome leftUpstream = IterOutcome.NONE;
-
-    @Override
-    public int getRecordCount() {
-        return outputRecords;
-    }
+  IterOutcome leftUpstream = IterOutcome.NONE;
 
+  @Override
+  public int getRecordCount() {
+    return outputRecords;
+  }
 
-    @Override
-    public IterOutcome next() {
 
-        try {
-            /* If we are here for the first time, execute the build phase of the
-             * hash join and setup the run time generated class for the probe side
-             */
-            if (hashJoinProbe == null) {
+  @Override
+  public IterOutcome next() {
 
-                // Initialize the hash join helper context
-                hjHelper = new HashJoinHelper(context, oContext.getAllocator());
+    try {
+      /* If we are here for the first time, execute the build phase of the
+       * hash join and setup the run time generated class for the probe side
+       */
+      if (hashJoinProbe == null) {
 
-                /* Build phase requires setting up the hash table. Hash table will
-                 * materialize both the build and probe side expressions while
-                 * creating the hash table. So we need to invoke next() on our probe batch
-                 * as well, for the materialization to be successful. This batch will not be used
-                 * till we complete the build phase.
-                 */
-                leftUpstream = left.next();
+        // Initialize the hash join helper context
+        hjHelper = new HashJoinHelper(context, oContext.getAllocator());
 
-                // Build the hash table, using the build side record batches.
-                executeBuildPhase();
+        /* Build phase requires setting up the hash table. Hash table will
+         * materialize both the build and probe side expressions while
+         * creating the hash table. So we need to invoke next() on our probe batch
+         * as well, for the materialization to be successful. This batch will not be used
+         * till we complete the build phase.
+         */
+        leftUpstream = left.next();
 
-                // Create the run time generated code needed to probe and project
-                hashJoinProbe = setupHashJoinProbe();
-            }
+        // Build the hash table, using the build side record batches.
+        executeBuildPhase();
 
-            // Store the number of records projected
-            if (hashTable != null) {
-
-                // Allocate the memory for the vectors in the output container
-                allocateVectors();
-
-                outputRecords = hashJoinProbe.probeAndProject();
-
-                /* We are here because of one the following
-                 * 1. Completed processing of all the records and we are done
-                 * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
-                 * Either case build the output container's schema and return
-                 */
-                if (outputRecords > 0) {
-
-                  // Build the container schema and set the counts
-                  container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-                  container.setRecordCount(outputRecords);
-
-                  for (VectorWrapper<?> v : container) {
-                    v.getValueVector().getMutator().setValueCount(outputRecords);
-                  }
-
-                  // First output batch, return OK_NEW_SCHEMA
-                  if (firstOutputBatch == true) {
-                    firstOutputBatch = false;
-                    return IterOutcome.OK_NEW_SCHEMA;
-                  }
-
-                  // Not the first output batch
-                  return IterOutcome.OK;
-                }
-            } else {
-                // Our build side is empty, we won't have any matches, clear the probe side
-                if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
-                    for (VectorWrapper<?> wrapper : left) {
-                      wrapper.getValueVector().clear();
-                    }
-                    leftUpstream = left.next();
-                    while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
-                      for (VectorWrapper<?> wrapper : left) {
-                        wrapper.getValueVector().clear();
-                      }
-                      leftUpstream = left.next();
-                    }
-                }
-            }
+        // Create the run time generated code needed to probe and project
+        hashJoinProbe = setupHashJoinProbe();
+      }
 
-            // No more output records, clean up and return
-            return IterOutcome.NONE;
+      // Store the number of records projected
+      if (hashTable != null) {
 
-        } catch (ClassTransformationException | SchemaChangeException | IOException e) {
-            context.fail(e);
-            killIncoming();
-            return IterOutcome.STOP;
-        }
-    }
+        // Allocate the memory for the vectors in the output container
+        allocateVectors();
 
-    public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
+        outputRecords = hashJoinProbe.probeAndProject();
 
-        // Setup the hash table configuration object
-        int conditionsSize = conditions.size();
+        /* We are here because of one the following
+         * 1. Completed processing of all the records and we are done
+         * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
+         * Either case build the output container's schema and return
+         */
+        if (outputRecords > 0) {
 
-        NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
-        NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
+        // Build the container schema and set the counts
+        container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+        container.setRecordCount(outputRecords);
 
-        // Create named expressions from the conditions
-        for (int i = 0; i < conditionsSize; i++) {
-            rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i ));
-            leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i));
+        for (VectorWrapper<?> v : container) {
+          v.getValueVector().getMutator().setValueCount(outputRecords);
+        }
 
-            // Hash join only supports equality currently.
-            assert conditions.get(i).getRelationship().equals("==");
+        // First output batch, return OK_NEW_SCHEMA
+        if (firstOutputBatch == true) {
+          firstOutputBatch = false;
+          return IterOutcome.OK_NEW_SCHEMA;
         }
 
-        // Set the left named expression to be null if the probe batch is empty.
-        if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
-            leftExpr = null;
-        } else {
-          if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
-            throw new SchemaChangeException("Hash join does not support probe batch with selection vectors");
+          // Not the first output batch
+          return IterOutcome.OK;
+        }
+      } else {
+        // Our build side is empty, we won't have any matches, clear the probe side
+        if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
+          for (VectorWrapper<?> wrapper : left) {
+            wrapper.getValueVector().clear();
+          }
+          leftUpstream = left.next();
+          while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
+            for (VectorWrapper<?> wrapper : left) {
+              wrapper.getValueVector().clear();
+            }
+            leftUpstream = left.next();
           }
         }
+      }
 
-        HashTableConfig htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
+      // No more output records, clean up and return
+      return IterOutcome.NONE;
 
-        // Create the chained hash table
-        ChainedHashTable ht  = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
-        hashTable = ht.createAndSetupHashTable(null);
+    } catch (ClassTransformationException | SchemaChangeException | IOException e) {
+      context.fail(e);
+      killIncoming();
+      return IterOutcome.STOP;
     }
+  }
 
-    public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
-
-        //Setup the underlying hash table
-        IterOutcome rightUpstream = right.next();
-
-        boolean moreData = true;
-
-        while (moreData) {
-
-            switch (rightUpstream) {
-
-                case NONE:
-                case NOT_YET:
-                case STOP:
-                    moreData = false;
-                    continue;
-
-                case OK_NEW_SCHEMA:
-                    if (rightSchema == null) {
-                        rightSchema = right.getSchema();
-
-                        if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
-                          throw new SchemaChangeException("Hash join does not support build batch with selection vectors");
-                        }
-                        setupHashTable();
-                    } else {
-                        throw new SchemaChangeException("Hash join does not support schema changes");
-                    }
-                // Fall through
-                case OK:
-                    int currentRecordCount = right.getRecordCount();
-
-                    /* For every new build batch, we store some state in the helper context
-                     * Add new state to the helper context
-                     */
-                    hjHelper.addNewBatch(currentRecordCount);
-
-                    // Holder contains the global index where the key is hashed into using the hash table
-                    IntHolder htIndex = new IntHolder();
-
-                    // For every record in the build batch , hash the key columns
-                    for (int i = 0; i < currentRecordCount; i++) {
-
-                        HashTable.PutStatus status = hashTable.put(i, htIndex);
-
-                        if (status != HashTable.PutStatus.PUT_FAILED) {
-                            /* Use the global index returned by the hash table, to store
-                             * the current record index and batch index. This will be used
-                             * later when we probe and find a match.
-                             */
-                            hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
-                        }
-                    }
-
-                    /* Completed hashing all records in this batch. Transfer the batch
-                     * to the hyper vector container. Will be used when we want to retrieve
-                     * records that have matching keys on the probe side.
-                     */
-                    RecordBatchData nextBatch = new RecordBatchData(right);
-                    if (hyperContainer == null) {
-                        hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
-                    } else {
-                        hyperContainer.addBatch(nextBatch.getContainer());
-                    }
-
-                    // completed processing a batch, increment batch index
-                    buildBatchIndex++;
-                    break;
-            }
-            // Get the next record batch
-            rightUpstream = right.next();
-        }
+  public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
+
+    // Setup the hash table configuration object
+    int conditionsSize = conditions.size();
+
+    NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
+    NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
+
+    // Create named expressions from the conditions
+    for (int i = 0; i < conditionsSize; i++) {
+      rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i ));
+      leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i));
+
+      // Hash join only supports equality currently.
+      assert conditions.get(i).getRelationship().equals("==");
     }
 
-    public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
+    // Set the left named expression to be null if the probe batch is empty.
+    if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
+      leftExpr = null;
+    } else {
+      if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+        throw new SchemaChangeException("Hash join does not support probe batch with selection vectors");
+      }
+    }
 
-        allocators = new ArrayList<>();
+    HashTableConfig htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
 
-        final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
-        ClassGenerator<HashJoinProbe> g = cg.getRoot();
+    // Create the chained hash table
+    ChainedHashTable ht  = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
+    hashTable = ht.createAndSetupHashTable(null);
+  }
 
-        // Generate the code to project build side records
-        g.setMappingSet(projectBuildMapping);
+  public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
 
+    //Setup the underlying hash table
+    IterOutcome rightUpstream = right.next();
 
-        int fieldId = 0;
-        JExpression buildIndex = JExpr.direct("buildIndex");
-        JExpression outIndex = JExpr.direct("outIndex");
-        g.rotateBlock();
+    boolean moreData = true;
 
-        if (hyperContainer != null) {
-            for(VectorWrapper<?> vv : hyperContainer) {
+    while (moreData) {
 
-                MajorType inputType = vv.getField().getType();
-                MajorType outputType;
-                if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
-                  outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
-                } else {
-                  outputType = inputType;
-                }
+      switch (rightUpstream) {
 
-                // Add the vector to our output container
-                ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator());
-                container.add(v);
-                allocators.add(RemovingRecordBatch.getAllocator4(v));
+        case NONE:
+        case NOT_YET:
+        case STOP:
+          moreData = false;
+          continue;
 
-                JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId));
-                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
-                g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
-                  .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
-                  .arg(outIndex)
-                  .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
+        case OK_NEW_SCHEMA:
+          if (rightSchema == null) {
+            rightSchema = right.getSchema();
 
-                fieldId++;
+            if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+              throw new SchemaChangeException("Hash join does not support build batch with selection vectors");
             }
-        }
-        g.rotateBlock();
-        g.getEvalBlock()._return(JExpr.TRUE);
+            setupHashTable();
+          } else {
+            throw new SchemaChangeException("Hash join does not support schema changes");
+          }
+        // Fall through
+        case OK:
+          int currentRecordCount = right.getRecordCount();
 
-        // Generate the code to project probe side records
-        g.setMappingSet(projectProbeMapping);
+          /* For every new build batch, we store some state in the helper context
+           * Add new state to the helper context
+           */
+          hjHelper.addNewBatch(currentRecordCount);
 
-        int outputFieldId = fieldId;
-        fieldId = 0;
-        JExpression probeIndex = JExpr.direct("probeIndex");
-        int recordCount = 0;
+          // Holder contains the global index where the key is hashed into using the hash table
+          IntHolder htIndex = new IntHolder();
 
-        if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
-            for (VectorWrapper<?> vv : left) {
+          // For every record in the build batch , hash the key columns
+          for (int i = 0; i < currentRecordCount; i++) {
 
-                MajorType inputType = vv.getField().getType();
-                MajorType outputType;
-                if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
-                  outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
-                } else {
-                  outputType = inputType;
-                }
+            HashTable.PutStatus status = hashTable.put(i, htIndex);
 
-                ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), oContext.getAllocator());
-                container.add(v);
-                allocators.add(RemovingRecordBatch.getAllocator4(v));
+            if (status != HashTable.PutStatus.PUT_FAILED) {
+              /* Use the global index returned by the hash table, to store
+               * the current record index and batch index. This will be used
+               * later when we probe and find a match.
+               */
+              hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
+            }
+          }
 
-                JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
-                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
+          /* Completed hashing all records in this batch. Transfer the batch
+           * to the hyper vector container. Will be used when we want to retrieve
+           * records that have matching keys on the probe side.
+           */
+          RecordBatchData nextBatch = new RecordBatchData(right);
+          if (hyperContainer == null) {
+            hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
+          } else {
+            hyperContainer.addBatch(nextBatch.getContainer());
+          }
 
-                g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
+          // completed processing a batch, increment batch index
+          buildBatchIndex++;
+          break;
+      }
+      // Get the next record batch
+      rightUpstream = right.next();
+    }
+  }
 
-                fieldId++;
-                outputFieldId++;
-            }
-            g.rotateBlock();
-            g.getEvalBlock()._return(JExpr.TRUE);
+  public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
+
+    allocators = new ArrayList<>();
+
+    final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+    ClassGenerator<HashJoinProbe> g = cg.getRoot();
+
+    // Generate the code to project build side records
+    g.setMappingSet(projectBuildMapping);
+
+
+    int fieldId = 0;
+    JExpression buildIndex = JExpr.direct("buildIndex");
+    JExpression outIndex = JExpr.direct("outIndex");
+    g.rotateBlock();
 
-            recordCount = left.getRecordCount();
+    if (hyperContainer != null) {
+      for(VectorWrapper<?> vv : hyperContainer) {
+
+        MajorType inputType = vv.getField().getType();
+        MajorType outputType;
+        if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
+          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
         }
 
+        // Add the vector to our output container
+        ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator());
+        container.add(v);
+        allocators.add(RemovingRecordBatch.getAllocator4(v));
 
-        HashJoinProbe hj = context.getImplementationClass(cg);
+        JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId));
+        JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
+        g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
+          .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+          .arg(outIndex)
+          .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
 
-        hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType);
-        return hj;
+        fieldId++;
+      }
     }
+    g.rotateBlock();
+    g.getEvalBlock()._return(JExpr.TRUE);
+
+    // Generate the code to project probe side records
+    g.setMappingSet(projectProbeMapping);
 
-    private void allocateVectors(){
-        for(VectorAllocator a : allocators){
-            a.alloc(RecordBatch.MAX_BATCH_SIZE);
+    int outputFieldId = fieldId;
+    fieldId = 0;
+    JExpression probeIndex = JExpr.direct("probeIndex");
+    int recordCount = 0;
+
+    if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      for (VectorWrapper<?> vv : left) {
+
+        MajorType inputType = vv.getField().getType();
+        MajorType outputType;
+        if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
+          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
         }
-    }
 
-    public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
-        super(popConfig, context);
-        this.left = left;
-        this.right = right;
-        this.joinType = popConfig.getJoinType();
-        this.conditions = popConfig.getConditions();
-    }
+        ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), oContext.getAllocator());
+        container.add(v);
+        allocators.add(RemovingRecordBatch.getAllocator4(v));
+
+        JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
+        JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
 
-    @Override
-    public void killIncoming() {
-        this.left.kill();
-        this.right.kill();
+        g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
+
+        fieldId++;
+        outputFieldId++;
+      }
+      g.rotateBlock();
+      g.getEvalBlock()._return(JExpr.TRUE);
+
+      recordCount = left.getRecordCount();
     }
 
-    @Override
-    public void cleanup() {
-        hjHelper.clear();
 
-        // If we didn't receive any data, hyperContainer may be null, check before clearing
-        if (hyperContainer != null) {
-            hyperContainer.clear();
-        }
+    HashJoinProbe hj = context.getImplementationClass(cg);
 
-        if (hashTable != null) {
-            hashTable.clear();
-        }
-        super.cleanup();
-        left.cleanup();
-        right.cleanup();
+    hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType);
+    return hj;
+  }
+
+  private void allocateVectors(){
+    for(VectorAllocator a : allocators){
+      a.alloc(RecordBatch.MAX_BATCH_SIZE);
+    }
+  }
+
+  public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
+    super(popConfig, context);
+    this.left = left;
+    this.right = right;
+    this.joinType = popConfig.getJoinType();
+    this.conditions = popConfig.getConditions();
+  }
+
+  @Override
+  public void killIncoming() {
+    this.left.kill();
+    this.right.kill();
+  }
+
+  @Override
+  public void cleanup() {
+    hjHelper.clear();
+
+    // If we didn't receive any data, hyperContainer may be null, check before clearing
+    if (hyperContainer != null) {
+      hyperContainer.clear();
+    }
+
+    if (hashTable != null) {
+      hashTable.clear();
     }
+    super.cleanup();
+    left.cleanup();
+    right.cleanup();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
index 19a4a29..d925958 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
@@ -29,9 +29,9 @@ import java.util.List;
 
 public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> {
 
-    @Override
-    public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
-        Preconditions.checkArgument(children.size() == 2);
-        return new HashJoinBatch(config, context, children.get(0), children.get(1));
-    }
+  @Override
+  public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 2);
+    return new HashJoinBatch(config, context, children.get(0), children.get(1));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
index b1ed07e..a634827 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -51,183 +51,183 @@ import org.apache.drill.exec.physical.impl.common.HashTable;
  */
 public class HashJoinHelper {
 
-    /* List of start indexes. Stores the record and batch index of the first record
-     * with a give key.
-     */
-    List<SelectionVector4> startIndices = new ArrayList<>();
-
-    // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
-    List<BuildInfo> buildInfoList = new ArrayList<>();
+  /* List of start indexes. Stores the record and batch index of the first record
+   * with a give key.
+   */
+  List<SelectionVector4> startIndices = new ArrayList<>();
 
-    // Fragment context
-    FragmentContext context;
-    BufferAllocator allocator;
+  // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
+  List<BuildInfo> buildInfoList = new ArrayList<>();
 
-    // Constant to indicate index is empty.
-    static final int INDEX_EMPTY = -1;
+  // Fragment context
+  FragmentContext context;
+  BufferAllocator allocator;
 
-    // bits to shift while obtaining batch index from SV4
-    static final int SHIFT_SIZE = 16;
+  // Constant to indicate index is empty.
+  static final int INDEX_EMPTY = -1;
 
-    public HashJoinHelper(FragmentContext context, BufferAllocator allocator) {
-        this.context = context;
-        this.allocator = allocator;
-    }
+  // bits to shift while obtaining batch index from SV4
+  static final int SHIFT_SIZE = 16;
 
-    public void addStartIndexBatch() throws SchemaChangeException {
-        startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
-    }
+  public HashJoinHelper(FragmentContext context, BufferAllocator allocator) {
+    this.context = context;
+    this.allocator = allocator;
+}
 
-    public class BuildInfo {
-        // List of links. Logically it helps maintain a linked list of records with the same key value
-        private SelectionVector4 links;
+  public void addStartIndexBatch() throws SchemaChangeException {
+    startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
+  }
 
-        // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
-        private BitSet keyMatchBitVector;
+  public class BuildInfo {
+    // List of links. Logically it helps maintain a linked list of records with the same key value
+    private SelectionVector4 links;
 
-        // number of records in this batch
-        private int recordCount;
+    // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
+    private BitSet keyMatchBitVector;
 
-        public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) {
-            this.links = links;
-            this.keyMatchBitVector = keyMatchBitVector;
-            this.recordCount = recordCount;
-        }
+    // number of records in this batch
+    private int recordCount;
 
-        public SelectionVector4 getLinks() {
-            return links;
-        }
+    public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) {
+      this.links = links;
+      this.keyMatchBitVector = keyMatchBitVector;
+      this.recordCount = recordCount;
+    }
 
-        public BitSet getKeyMatchBitVector() {
-            return keyMatchBitVector;
-        }
+    public SelectionVector4 getLinks() {
+      return links;
     }
 
-    public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
+    public BitSet getKeyMatchBitVector() {
+      return keyMatchBitVector;
+    }
+  }
 
-        ByteBuf vector = allocator.buffer((recordCount * 4));
+  public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
 
-        SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
+    ByteBuf vector = allocator.buffer((recordCount * 4));
 
-        // Initialize the vector
-        for (int i = 0; i < recordCount; i++) {
-            sv4.set(i, INDEX_EMPTY);
-        }
+    SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
 
-        return sv4;
+    // Initialize the vector
+    for (int i = 0; i < recordCount; i++) {
+      sv4.set(i, INDEX_EMPTY);
     }
 
-    public void addNewBatch(int recordCount) throws SchemaChangeException {
-        // Add a node to the list of BuildInfo's
-        BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount);
-        buildInfoList.add(info);
-    }
+    return sv4;
+  }
 
-    public int getStartIndex(int keyIndex) {
-        int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
-        int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
+  public void addNewBatch(int recordCount) throws SchemaChangeException {
+    // Add a node to the list of BuildInfo's
+    BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount);
+    buildInfoList.add(info);
+  }
 
-        assert batchIdx < startIndices.size();
+  public int getStartIndex(int keyIndex) {
+    int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
+    int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
 
-        SelectionVector4 sv4 = startIndices.get(batchIdx);
+    assert batchIdx < startIndices.size();
 
-        return sv4.get(offsetIdx);
-    }
+    SelectionVector4 sv4 = startIndices.get(batchIdx);
 
-    public int getNextIndex(int currentIdx) {
-        // Get to the links field of the current index to get the next index
-        int batchIdx = currentIdx >>> SHIFT_SIZE;
-        int recordIdx = currentIdx & HashTable.BATCH_MASK;
+    return sv4.get(offsetIdx);
+  }
 
-        assert batchIdx < buildInfoList.size();
+  public int getNextIndex(int currentIdx) {
+    // Get to the links field of the current index to get the next index
+    int batchIdx = currentIdx >>> SHIFT_SIZE;
+    int recordIdx = currentIdx & HashTable.BATCH_MASK;
 
-        // Get the corresponding BuildInfo node
-        BuildInfo info = buildInfoList.get(batchIdx);
-        return info.getLinks().get(recordIdx);
-    }
+    assert batchIdx < buildInfoList.size();
 
-    public List<Integer> getNextUnmatchedIndex() {
-        List<Integer> compositeIndexes = new ArrayList<>();
+    // Get the corresponding BuildInfo node
+    BuildInfo info = buildInfoList.get(batchIdx);
+    return info.getLinks().get(recordIdx);
+  }
 
-        for (int i = 0; i < buildInfoList.size(); i++) {
-            BuildInfo info = buildInfoList.get(i);
-            int fromIndex = 0;
+  public List<Integer> getNextUnmatchedIndex() {
+    List<Integer> compositeIndexes = new ArrayList<>();
 
-            while (((fromIndex = info.getKeyMatchBitVector().nextClearBit(fromIndex)) != -1) && (fromIndex < info.recordCount)) {
-                compositeIndexes.add((i << SHIFT_SIZE) | (fromIndex & HashTable.BATCH_MASK));
-                fromIndex++;
-            }
-        }
-        return compositeIndexes;
+    for (int i = 0; i < buildInfoList.size(); i++) {
+      BuildInfo info = buildInfoList.get(i);
+      int fromIndex = 0;
+
+      while (((fromIndex = info.getKeyMatchBitVector().nextClearBit(fromIndex)) != -1) && (fromIndex < info.recordCount)) {
+          compositeIndexes.add((i << SHIFT_SIZE) | (fromIndex & HashTable.BATCH_MASK));
+          fromIndex++;
+      }
     }
+    return compositeIndexes;
+  }
 
-    public void setRecordMatched(int index) {
-        int batchIdx  = index >>> SHIFT_SIZE;
-        int recordIdx = index & HashTable.BATCH_MASK;
+  public void setRecordMatched(int index) {
+    int batchIdx  = index >>> SHIFT_SIZE;
+    int recordIdx = index & HashTable.BATCH_MASK;
 
-        // Get the BitVector for the appropriate batch and set the bit to indicate the record matched
-        BuildInfo info = buildInfoList.get(batchIdx);
-        BitSet bitVector = info.getKeyMatchBitVector();
+    // Get the BitVector for the appropriate batch and set the bit to indicate the record matched
+    BuildInfo info = buildInfoList.get(batchIdx);
+    BitSet bitVector = info.getKeyMatchBitVector();
 
-        bitVector.set(recordIdx);
-    }
+    bitVector.set(recordIdx);
+  }
 
-    public void setCurrentIndex(int keyIndex, int batchIndex, int recordIndex) throws SchemaChangeException {
+  public void setCurrentIndex(int keyIndex, int batchIndex, int recordIndex) throws SchemaChangeException {
 
-        /* set the current record batch index and the index
-         * within the batch at the specified keyIndex. The keyIndex
-         * denotes the global index where the key for this record is
-         * stored in the hash table
+    /* set the current record batch index and the index
+     * within the batch at the specified keyIndex. The keyIndex
+     * denotes the global index where the key for this record is
+     * stored in the hash table
+     */
+    int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
+    int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
+
+    if (keyIndex >= (HashTable.BATCH_SIZE * startIndices.size())) {
+        // allocate a new batch
+      addStartIndexBatch();
+    }
+
+    SelectionVector4 startIndex = startIndices.get(batchIdx);
+    int linkIndex;
+
+    // If head of the list is empty, insert current index at this position
+    if ((linkIndex = (startIndex.get(offsetIdx))) == INDEX_EMPTY) {
+      startIndex.set(offsetIdx, batchIndex, recordIndex);
+    } else {
+      /* Head of this list is not empty, if the first link
+       * is empty insert there
+       */
+      batchIdx = linkIndex >>> SHIFT_SIZE;
+      offsetIdx = linkIndex & Character.MAX_VALUE;
+
+      SelectionVector4 link = buildInfoList.get(batchIdx).getLinks();
+      int firstLink = link.get(offsetIdx);
+
+      if (firstLink == INDEX_EMPTY) {
+        link.set(offsetIdx, batchIndex, recordIndex);
+      } else {
+        /* Insert the current value as the first link and
+         * make the current first link as its next
          */
-        int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
-        int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
-
-        if (keyIndex >= (HashTable.BATCH_SIZE * startIndices.size())) {
-            // allocate a new batch
-            addStartIndexBatch();
-        }
-
-        SelectionVector4 startIndex = startIndices.get(batchIdx);
-        int linkIndex;
-
-        // If head of the list is empty, insert current index at this position
-        if ((linkIndex = (startIndex.get(offsetIdx))) == INDEX_EMPTY) {
-            startIndex.set(offsetIdx, batchIndex, recordIndex);
-        } else {
-            /* Head of this list is not empty, if the first link
-             * is empty insert there
-             */
-            batchIdx = linkIndex >>> SHIFT_SIZE;
-            offsetIdx = linkIndex & Character.MAX_VALUE;
-
-            SelectionVector4 link = buildInfoList.get(batchIdx).getLinks();
-            int firstLink = link.get(offsetIdx);
-
-            if (firstLink == INDEX_EMPTY) {
-                link.set(offsetIdx, batchIndex, recordIndex);
-            } else {
-                /* Insert the current value as the first link and
-                 * make the current first link as its next
-                 */
-                int firstLinkBatchIdx  = firstLink >>> SHIFT_SIZE;
-                int firstLinkOffsetIDx = firstLink & Character.MAX_VALUE;
-
-                SelectionVector4 nextLink = buildInfoList.get(batchIndex).getLinks();
-                nextLink.set(recordIndex, firstLinkBatchIdx, firstLinkOffsetIDx);
-
-                link.set(offsetIdx, batchIndex, recordIndex);
-            }
-        }
+        int firstLinkBatchIdx  = firstLink >>> SHIFT_SIZE;
+        int firstLinkOffsetIDx = firstLink & Character.MAX_VALUE;
+
+        SelectionVector4 nextLink = buildInfoList.get(batchIndex).getLinks();
+        nextLink.set(recordIndex, firstLinkBatchIdx, firstLinkOffsetIDx);
+
+        link.set(offsetIdx, batchIndex, recordIndex);
+      }
     }
+  }
 
-    public void clear() {
-        // Clear the SV4 used for start indices
-        for (SelectionVector4 sv4: startIndices) {
-            sv4.clear();
-        }
+  public void clear() {
+    // Clear the SV4 used for start indices
+    for (SelectionVector4 sv4: startIndices) {
+      sv4.clear();
+    }
 
-        for (BuildInfo info : buildInfoList) {
-            info.getLinks().clear();
-        }
+    for (BuildInfo info : buildInfoList) {
+      info.getLinks().clear();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index 160d352..6d20f60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -33,25 +33,25 @@ import org.eigenbase.rel.JoinRelType;
 import java.io.IOException;
 
 public interface HashJoinProbe {
-    public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
+  public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
 
-    /* The probe side of the hash join can be in the following two states
-     * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
-     *    key match and if we do we project the record
-     * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records
-     *    from the build side that did not match any records on the probe side. For Left outer
-     *    case we handle it internally by projecting the record if there isn't a match on the build side
-     * 3. DONE: Once we have projected all possible records we are done
-     */
-    public static enum ProbeState {
-        PROBE_PROJECT, PROJECT_RIGHT, DONE
-    }
+  /* The probe side of the hash join can be in the following two states
+   * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
+   *    key match and if we do we project the record
+   * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records
+   *    from the build side that did not match any records on the probe side. For Left outer
+   *    case we handle it internally by projecting the record if there isn't a match on the build side
+   * 3. DONE: Once we have projected all possible records we are done
+   */
+  public static enum ProbeState {
+    PROBE_PROJECT, PROJECT_RIGHT, DONE
+  }
 
-    public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
-                                            int probeRecordCount, RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
-                                            JoinRelType joinRelType);
-    public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
-    public abstract int  probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
-    public abstract boolean projectBuildRecord(int buildIndex, int outIndex);
-    public abstract boolean projectProbeRecord(int probeIndex, int outIndex);
+  public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+                                          int probeRecordCount, RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
+                                          JoinRelType joinRelType);
+  public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
+  public abstract int  probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
+  public abstract boolean projectBuildRecord(int buildIndex, int outIndex);
+  public abstract boolean projectProbeRecord(int probeIndex, int outIndex);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index a3e3b74..2a8be54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -35,196 +35,196 @@ import java.util.List;
 
 public abstract class HashJoinProbeTemplate implements HashJoinProbe {
 
-    // Probe side record batch
-    private RecordBatch probeBatch;
+  // Probe side record batch
+  private RecordBatch probeBatch;
 
-    // Join type, INNER, LEFT, RIGHT or OUTER
-    private JoinRelType joinType;
+  // Join type, INNER, LEFT, RIGHT or OUTER
+  private JoinRelType joinType;
 
-    /* Helper class
-     * Maintains linked list of build side records with the same key
-     * Keeps information about which build records have a corresponding
-     * matching key in the probe side (for outer, right joins)
-     */
-    private HashJoinHelper hjHelper = null;
+  /* Helper class
+   * Maintains linked list of build side records with the same key
+   * Keeps information about which build records have a corresponding
+   * matching key in the probe side (for outer, right joins)
+   */
+  private HashJoinHelper hjHelper = null;
 
-    // Underlying hashtable used by the hash join
-    private HashTable hashTable = null;
+  // Underlying hashtable used by the hash join
+  private HashTable hashTable = null;
 
-    // Number of records to process on the probe side
-    private int recordsToProcess = 0;
+  // Number of records to process on the probe side
+  private int recordsToProcess = 0;
 
-    // Number of records processed on the probe side
-    private int recordsProcessed = 0;
+  // Number of records processed on the probe side
+  private int recordsProcessed = 0;
 
-    // Number of records in the output container
-    private int outputRecords;
+  // Number of records in the output container
+  private int outputRecords;
 
-    // Indicate if we should drain the next record from the probe side
-    private boolean getNextRecord = true;
+  // Indicate if we should drain the next record from the probe side
+  private boolean getNextRecord = true;
 
-    // Contains both batch idx and record idx of the matching record in the build side
-    private int currentCompositeIdx = -1;
+  // Contains both batch idx and record idx of the matching record in the build side
+  private int currentCompositeIdx = -1;
 
-    // Current state the hash join algorithm is in
-    private ProbeState probeState = ProbeState.PROBE_PROJECT;
+  // Current state the hash join algorithm is in
+  private ProbeState probeState = ProbeState.PROBE_PROJECT;
 
-    // For outer or right joins, this is a list of unmatched records that needs to be projected
-    private List<Integer> unmatchedBuildIndexes = null;
+  // For outer or right joins, this is a list of unmatched records that needs to be projected
+  private List<Integer> unmatchedBuildIndexes = null;
 
-    @Override
-    public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
-                                   int probeRecordCount, RecordBatch outgoing, HashTable hashTable,
-                                   HashJoinHelper hjHelper, JoinRelType joinRelType) {
+  @Override
+  public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+                                 int probeRecordCount, RecordBatch outgoing, HashTable hashTable,
+                                 HashJoinHelper hjHelper, JoinRelType joinRelType) {
 
-        this.probeBatch = probeBatch;
-        this.joinType = joinRelType;
-        this.recordsToProcess = probeRecordCount;
-        this.hashTable = hashTable;
-        this.hjHelper = hjHelper;
+    this.probeBatch = probeBatch;
+    this.joinType = joinRelType;
+    this.recordsToProcess = probeRecordCount;
+    this.hashTable = hashTable;
+    this.hjHelper = hjHelper;
 
-        doSetup(context, buildBatch, probeBatch, outgoing);
+    doSetup(context, buildBatch, probeBatch, outgoing);
+  }
+
+  public void executeProjectRightPhase() {
+    while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) {
+      boolean success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++);
+      assert success;
     }
+  }
 
-    public void executeProjectRightPhase() {
-        while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) {
-            boolean success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++);
-            assert success;
+  public void executeProbePhase() throws SchemaChangeException {
+    while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) {
+
+      // Check if we have processed all records in this batch we need to invoke next
+      if (recordsProcessed == recordsToProcess) {
+
+        // Done processing all records in the previous batch, clean up!
+        for (VectorWrapper<?> wrapper : probeBatch) {
+          wrapper.getValueVector().clear();
         }
-    }
 
-    public void executeProbePhase() throws SchemaChangeException {
-        while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) {
+        IterOutcome leftUpstream = probeBatch.next();
 
-            // Check if we have processed all records in this batch we need to invoke next
-            if (recordsProcessed == recordsToProcess) {
+        switch (leftUpstream) {
+          case NONE:
+          case NOT_YET:
+          case STOP:
+            recordsProcessed = 0;
+            recordsToProcess = 0;
+            probeState = ProbeState.DONE;
 
-                // Done processing all records in the previous batch, clean up!
-                for (VectorWrapper<?> wrapper : probeBatch) {
-                    wrapper.getValueVector().clear();
-                }
+            // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side
+            if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) {
+                probeState = ProbeState.PROJECT_RIGHT;
+            }
 
-                IterOutcome leftUpstream = probeBatch.next();
+            continue;
 
-                switch (leftUpstream) {
-                    case NONE:
-                    case NOT_YET:
-                    case STOP:
-                        recordsProcessed = 0;
-                        recordsToProcess = 0;
-                        probeState = ProbeState.DONE;
+          case OK_NEW_SCHEMA:
+            throw new SchemaChangeException("Hash join does not support schema changes");
+          case OK:
+            recordsToProcess = probeBatch.getRecordCount();
+            recordsProcessed = 0;
+        }
+      }
+      int probeIndex;
 
-                        // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side
-                        if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) {
-                            probeState = ProbeState.PROJECT_RIGHT;
-                        }
+      // Check if we need to drain the next row in the probe side
+      if (getNextRecord) {
+          probeIndex = hashTable.containsKey(recordsProcessed, true);
 
-                        continue;
+          if (probeIndex != -1) {
 
-                    case OK_NEW_SCHEMA:
-                        throw new SchemaChangeException("Hash join does not support schema changes");
-                    case OK:
-                        recordsToProcess = probeBatch.getRecordCount();
-                        recordsProcessed = 0;
-                }
-            }
-            int probeIndex;
-
-            // Check if we need to drain the next row in the probe side
-            if (getNextRecord) {
-                probeIndex = hashTable.containsKey(recordsProcessed, true);
-
-                if (probeIndex != -1) {
-
-                    /* The current probe record has a key that matches. Get the index
-                     * of the first row in the build side that matches the current key
-                     */
-                    currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
-
-                    /* Record in the build side at currentCompositeIdx has a matching record in the probe
-                     * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
-                     * join we keep track of which records we need to project at the end
-                     */
-                    hjHelper.setRecordMatched(currentCompositeIdx);
-
-                    boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
-                    assert success;
-                    success = projectProbeRecord(recordsProcessed, outputRecords);
-                    assert success;
-                    outputRecords++;
-
-                    /* Projected single row from the build side with matching key but there
-                     * may be more rows with the same key. Check if that's the case
-                     */
-                    currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-                    if (currentCompositeIdx == -1) {
-                        /* We only had one row in the build side that matched the current key
-                         * from the probe side. Drain the next row in the probe side.
-                         */
-                        recordsProcessed++;
-                    }
-                    else {
-                        /* There is more than one row with the same key on the build side
-                         * don't drain more records from the probe side till we have projected
-                         * all the rows with this key
-                         */
-                        getNextRecord = false;
-                    }
-                }
-                else { // No matching key
-
-                    // If we have a left outer join, project the keys
-                    if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
-                        projectProbeRecord(recordsProcessed, outputRecords++);
-                    }
-                    recordsProcessed++;
-                }
+            /* The current probe record has a key that matches. Get the index
+             * of the first row in the build side that matches the current key
+             */
+            currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
+
+            /* Record in the build side at currentCompositeIdx has a matching record in the probe
+             * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
+             * join we keep track of which records we need to project at the end
+             */
+            hjHelper.setRecordMatched(currentCompositeIdx);
+
+            boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
+            assert success;
+            success = projectProbeRecord(recordsProcessed, outputRecords);
+            assert success;
+            outputRecords++;
+
+            /* Projected single row from the build side with matching key but there
+             * may be more rows with the same key. Check if that's the case
+             */
+            currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+            if (currentCompositeIdx == -1) {
+              /* We only had one row in the build side that matched the current key
+               * from the probe side. Drain the next row in the probe side.
+               */
+              recordsProcessed++;
             }
             else {
-                hjHelper.setRecordMatched(currentCompositeIdx);
-                boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
-                assert success;
-                projectProbeRecord(recordsProcessed, outputRecords);
-                outputRecords++;
-
-                currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-
-                if (currentCompositeIdx == -1) {
-                    // We don't have any more rows matching the current key on the build side, move on to the next probe row
-                    getNextRecord = true;
-                    recordsProcessed++;
-                }
+              /* There is more than one row with the same key on the build side
+               * don't drain more records from the probe side till we have projected
+               * all the rows with this key
+               */
+              getNextRecord = false;
             }
         }
-    }
+          else { // No matching key
 
-    public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException {
+            // If we have a left outer join, project the keys
+            if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
+              projectProbeRecord(recordsProcessed, outputRecords++);
+            }
+            recordsProcessed++;
+          }
+      }
+      else {
+        hjHelper.setRecordMatched(currentCompositeIdx);
+        boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
+        assert success;
+        projectProbeRecord(recordsProcessed, outputRecords);
+        outputRecords++;
+
+        currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+
+        if (currentCompositeIdx == -1) {
+          // We don't have any more rows matching the current key on the build side, move on to the next probe row
+          getNextRecord = true;
+          recordsProcessed++;
+        }
+      }
+    }
+  }
 
-        outputRecords = 0;
+  public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException {
 
-        if (probeState == ProbeState.PROBE_PROJECT) {
-            executeProbePhase();
-        }
+    outputRecords = 0;
 
-        if (probeState == ProbeState.PROJECT_RIGHT) {
+    if (probeState == ProbeState.PROBE_PROJECT) {
+      executeProbePhase();
+    }
 
-            // We are here because we have a RIGHT OUTER or a FULL join
-            if (unmatchedBuildIndexes == null) {
-                // Initialize list of build indexes that didn't match a record on the probe side
-                unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
-                recordsToProcess = unmatchedBuildIndexes.size();
-                recordsProcessed = 0;
-            }
+    if (probeState == ProbeState.PROJECT_RIGHT) {
 
-            // Project the list of unmatched records on the build side
-            executeProjectRightPhase();
-        }
+      // We are here because we have a RIGHT OUTER or a FULL join
+      if (unmatchedBuildIndexes == null) {
+        // Initialize list of build indexes that didn't match a record on the probe side
+        unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
+        recordsToProcess = unmatchedBuildIndexes.size();
+        recordsProcessed = 0;
+      }
 
-        return outputRecords;
+      // Project the list of unmatched records on the build side
+      executeProjectRightPhase();
     }
 
-    public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
-                                 @Named("outgoing") RecordBatch outgoing);
-    public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
-    public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
+    return outputRecords;
+  }
+
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
+                               @Named("outgoing") RecordBatch outgoing);
+  public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
+  public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
 }


[10/14] git commit: enable multi-phase aggregate by default

Posted by ja...@apache.org.
enable multi-phase aggregate by default


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4b0d060a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4b0d060a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4b0d060a

Branch: refs/heads/master
Commit: 4b0d060a913e8ccb25131c805a8dc0c170ddf191
Parents: 78ae265
Author: Jacques Nadeau <ja...@apache.org>
Authored: Mon May 19 17:31:23 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 17:46:55 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/PlannerSettings.java  | 38 ++++++++++----------
 1 file changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4b0d060a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index e65ef17..18a32af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -32,10 +32,10 @@ public class PlannerSettings implements FrameworkContext{
 
   public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false);
   public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg", true);
-  public static final OptionValidator STREAMAGG = new BooleanValidator("planner.enable_streamagg", true);  
-  public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);  
-  public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);  
-  public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", false);  
+  public static final OptionValidator STREAMAGG = new BooleanValidator("planner.enable_streamagg", true);
+  public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);
+  public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
+  public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);
   public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", false);
 
   public OptionManager options = null;
@@ -49,13 +49,13 @@ public class PlannerSettings implements FrameworkContext{
   }
 
   public int numEndPoints() {
-    return numEndPoints;  
+    return numEndPoints;
   }
-  
+
   public boolean useDefaultCosting() {
     return useDefaultCosting;
   }
-    
+
   public void setNumEndPoints(int numEndPoints) {
     this.numEndPoints = numEndPoints;
   }
@@ -63,35 +63,35 @@ public class PlannerSettings implements FrameworkContext{
   public void setUseDefaultCosting(boolean defcost) {
     this.useDefaultCosting = defcost;
   }
-  
+
   public boolean isHashAggEnabled() {
-    return options.getOption(HASHAGG.getOptionName()).bool_val;  
+    return options.getOption(HASHAGG.getOptionName()).bool_val;
   }
-  
+
   public boolean isStreamAggEnabled() {
-    return options.getOption(STREAMAGG.getOptionName()).bool_val;  
+    return options.getOption(STREAMAGG.getOptionName()).bool_val;
   }
-  
+
   public boolean isHashJoinEnabled() {
     return options.getOption(HASHJOIN.getOptionName()).bool_val;
   }
-  
+
   public boolean isMergeJoinEnabled() {
-    return options.getOption(MERGEJOIN.getOptionName()).bool_val;  
+    return options.getOption(MERGEJOIN.getOptionName()).bool_val;
   }
-  
+
   public boolean isMultiPhaseAggEnabled() {
     return options.getOption(MULTIPHASE.getOptionName()).bool_val;
   }
-  
+
   public boolean isBroadcastJoinEnabled() {
-    return options.getOption(BROADCAST.getOptionName()).bool_val;  
+    return options.getOption(BROADCAST.getOptionName()).bool_val;
   }
 
   public int getBroadcastThreshold() {
-    return broadcastThreshold;  
+    return broadcastThreshold;
   }
-  
+
   @Override
   public <T> T unwrap(Class<T> clazz) {
     if(clazz == PlannerSettings.class){


[12/14] git commit: DRILL-671: Select against hbase table with filter against row_key fails

Posted by ja...@apache.org.
DRILL-671: Select against hbase table with filter against row_key fails


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6d4dc8fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6d4dc8fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6d4dc8fe

Branch: refs/heads/master
Commit: 6d4dc8fe00d4aea7f7e6a8e67ea404933d326bcf
Parents: 7388150
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Fri May 16 18:18:47 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 18:06:28 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java     | 3 ++-
 .../src/test/java/org/apache/drill/hbase/BaseHBaseTest.java       | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6d4dc8fe/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
index 0e0ccf5..924cd6e 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
@@ -53,7 +53,8 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
   }
 
   public HBaseScanSpec parseTree() {
-    return mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), le.accept(this, null));
+    HBaseScanSpec parsedSpec = le.accept(this, null);
+    return parsedSpec != null ? mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), parsedSpec ) : null;
   }
 
   public boolean isAllExpressionsConverted() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6d4dc8fe/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index 48193eb..9e07d9f 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -55,7 +55,7 @@ public class BaseHBaseTest extends BaseTestQuery {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     /*
-     * Change the following to HBaseTestsSuite.configure(false, false)
+     * Change the following to HBaseTestsSuite.configure(false, true)
      * if you want to test against an externally running HBase cluster.
      */
     HBaseTestsSuite.configure(true, true);


[06/14] git commit: DRILL-771: Remove explicit cast to varchar for concat() function; implicit cast will be used.

Posted by ja...@apache.org.
DRILL-771: Remove explicit cast to varchar for concat() function; implicit cast will be used.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/492ec59c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/492ec59c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/492ec59c

Branch: refs/heads/master
Commit: 492ec59c17d718bfb0891a249f9e7a841372fad8
Parents: 62c0b1b
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 18 19:18:51 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Mon May 19 10:43:31 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/planner/logical/DrillOptiq.java     | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/492ec59c/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
index 7efd714..e900fc2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
@@ -301,16 +301,15 @@ public class DrillOptiq {
 
         // Cast arguments to VARCHAR
         List<LogicalExpression> concatArgs = Lists.newArrayList();
-        MajorType castType = Types.required(MinorType.VARCHAR).toBuilder().setWidth(64000).build();
-        concatArgs.add(FunctionCallFactory.createCast(castType, ExpressionPosition.UNKNOWN, args.get(0)));
-        concatArgs.add(FunctionCallFactory.createCast(castType, ExpressionPosition.UNKNOWN, args.get(1)));
+        concatArgs.add(args.get(0));
+        concatArgs.add(args.get(1));
 
         LogicalExpression first = FunctionCallFactory.createExpression(functionName, concatArgs);
 
         for (int i = 2; i < args.size(); i++) {
           concatArgs = Lists.newArrayList();
           concatArgs.add(first);
-          concatArgs.add(FunctionCallFactory.createCast(castType, ExpressionPosition.UNKNOWN, args.get(i)));
+          concatArgs.add(args.get(i));
           first = FunctionCallFactory.createExpression(functionName, concatArgs);
         }