You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/20 05:01:49 UTC
[01/14] git commit: DRILL-713: Fix resizing of the hash table.
Repository: incubator-drill
Updated Branches:
refs/heads/master 70fab8c96 -> 5d7e3d3ab
DRILL-713: Fix resizing of the hash table.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1f4276e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1f4276e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1f4276e9
Branch: refs/heads/master
Commit: 1f4276e909e1629f359f7ccaa2c777b6dce0e6d9
Parents: 70fab8c
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 18 01:33:53 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Sun May 18 01:33:53 2014 -0700
----------------------------------------------------------------------
.../exec/physical/impl/common/HashTableTemplate.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1f4276e9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 402e395..3a8e609 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -476,15 +476,20 @@ public abstract class HashTableTemplate implements HashTable {
private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) {
- // resize hash table if needed and transfer the metadata
- resizeAndRehashIfNeeded(currentIdx);
-
addBatchIfNeeded(currentIdx);
BatchHolder bh = batchHolders.get( (currentIdx >>> 16) & BATCH_MASK);
if (bh.insertEntry(incomingRowIdx, currentIdx, hashValue, lastEntryBatch, lastEntryIdx)) {
numEntries++ ;
+
+ /* Resize hash table if needed and transfer the metadata
+ * Resize only after inserting the current entry into the hash table
+ * Otherwise our calculated lastEntryBatch and lastEntryIdx
+ * becomes invalid after resize.
+ */
+ resizeAndRehashIfNeeded();
+
return true;
}
@@ -548,7 +553,7 @@ public abstract class HashTableTemplate implements HashTable {
// For each entry in the old hash table, re-hash it to the new table and update the metadata
// in the new table.. the metadata consists of the startIndices, links and hashValues.
// Note that the keys stored in the BatchHolders are not moved around.
- private void resizeAndRehashIfNeeded(int currentIdx) {
+ private void resizeAndRehashIfNeeded() {
if (numEntries < threshold)
return;
[11/14] git commit: DRILL-764: Add support for 'convert_to()' and
'convert_from()' functions from SQL
Posted by ja...@apache.org.
DRILL-764: Add support for 'convert_to()' and 'convert_from()' functions from SQL
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/73881506
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/73881506
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/73881506
Branch: refs/heads/master
Commit: 738815061432fb610962bc72bebda3f52d5b148a
Parents: 4b0d060
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Fri May 16 17:13:44 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 18:06:13 2014 -0700
----------------------------------------------------------------------
.../expr/fn/impl/conv/DummyConvertFrom.java | 43 ++++++
.../exec/expr/fn/impl/conv/DummyConvertTo.java | 43 ++++++
.../drill/exec/planner/logical/DrillOptiq.java | 3 +
.../java/org/apache/drill/BaseTestQuery.java | 2 +-
.../physical/impl/TestConvertFunctions.java | 147 +++++++++++--------
5 files changed, 176 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73881506/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertFrom.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertFrom.java
new file mode 100644
index 0000000..ac75f48
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertFrom.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.expr.fn.impl.conv;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.record.RecordBatch;
+
+/**
+ * This and {@link DummyConvertTo} class merely act as a placeholder so that Optiq
+ * allows 'convert_to()' and 'convert_from()' functions in SQL.
+ */
+@FunctionTemplate(name = "convert_from", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+public class DummyConvertFrom implements DrillSimpleFunc {
+
+ @Output VarBinaryHolder out;
+
+ @Override
+ public void setup(RecordBatch incoming) { }
+
+ @Override
+ public void eval() { }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73881506/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertTo.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertTo.java
new file mode 100644
index 0000000..36ddf6d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DummyConvertTo.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.expr.fn.impl.conv;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.record.RecordBatch;
+
+/**
+ * This and {@link DummyConvertFrom} class merely act as a placeholder so that Optiq
+ * allows 'convert_to()' and 'convert_from()' functions in SQL.
+ */
+@FunctionTemplate(name = "convert_to", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+public class DummyConvertTo implements DrillSimpleFunc {
+
+ @Output VarBinaryHolder out;
+
+ @Override
+ public void setup(RecordBatch incoming) { }
+
+ @Override
+ public void eval() { }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73881506/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
index e900fc2..1e0f7ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
@@ -326,6 +326,9 @@ public class DrillOptiq {
return FunctionCallFactory.createExpression(functionName, args.subList(0, 1));
}
+ } else if ((functionName.equals("convert_from") || functionName.equals("convert_to"))
+ && args.get(1) instanceof QuotedString) {
+ return FunctionCallFactory.createConvert(functionName, ((QuotedString)args.get(1)).value, args.get(0), ExpressionPosition.UNKNOWN);
}
return FunctionCallFactory.createExpression(functionName, args);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73881506/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 062511e..8121a54 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -113,7 +113,7 @@ public class BaseTestQuery extends ExecTest{
return testRunAndReturn(QueryType.PHYSICAL, physical);
}
- private List<QueryResultBatch> testRunAndReturn(QueryType type, String query) throws Exception{
+ protected List<QueryResultBatch> testRunAndReturn(QueryType type, String query) throws Exception{
query = query.replace("[WORKING_PATH]", TestTools.getWorkingPath());
return client.runQuery(type, query);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/73881506/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java
index 4f9e8e9..e2935a9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java
@@ -30,8 +30,8 @@ import java.util.List;
import mockit.Injectable;
import org.apache.drill.BaseTestQuery;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.fn.impl.DateUtility;
+import org.apache.drill.exec.proto.UserProtos.QueryType;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserServer;
@@ -66,172 +66,190 @@ public class TestConvertFunctions extends BaseTestQuery {
@Test
public void testDateTime1() throws Throwable {
- runTest("(convert_from(binary_string('" + DATE_TIME_BE + "'), 'TIME_EPOCH_BE'))", time);
+ verifyPhysicalPlan("(convert_from(binary_string('" + DATE_TIME_BE + "'), 'TIME_EPOCH_BE'))", time);
}
@Test
public void testDateTime2() throws Throwable {
- runTest("convert_from(binary_string('" + DATE_TIME_LE + "'), 'TIME_EPOCH')", time);
+ verifyPhysicalPlan("convert_from(binary_string('" + DATE_TIME_LE + "'), 'TIME_EPOCH')", time);
}
@Test
public void testDateTime3() throws Throwable {
- runTest("convert_from(binary_string('" + DATE_TIME_BE + "'), 'DATE_EPOCH_BE')", date );
+ verifyPhysicalPlan("convert_from(binary_string('" + DATE_TIME_BE + "'), 'DATE_EPOCH_BE')", date );
}
@Test
public void testDateTime4() throws Throwable {
- runTest("convert_from(binary_string('" + DATE_TIME_LE + "'), 'DATE_EPOCH')", date);
+ verifyPhysicalPlan("convert_from(binary_string('" + DATE_TIME_LE + "'), 'DATE_EPOCH')", date);
}
@Test
public void testFixedInts1() throws Throwable {
- runTest("convert_from(binary_string('\\xAD'), 'TINYINT')", (byte) 0xAD);
+ verifyPhysicalPlan("convert_from(binary_string('\\xAD'), 'TINYINT')", (byte) 0xAD);
}
@Test
public void testFixedInts2() throws Throwable {
- runTest("convert_from(binary_string('\\xFE\\xCA'), 'SMALLINT')", (short) 0xCAFE);
+ verifyPhysicalPlan("convert_from(binary_string('\\xFE\\xCA'), 'SMALLINT')", (short) 0xCAFE);
}
@Test
public void testFixedInts3() throws Throwable {
- runTest("convert_from(binary_string('\\xCA\\xFE'), 'SMALLINT_BE')", (short) 0xCAFE);
+ verifyPhysicalPlan("convert_from(binary_string('\\xCA\\xFE'), 'SMALLINT_BE')", (short) 0xCAFE);
}
@Test
public void testFixedInts4() throws Throwable {
- runTest("convert_from(binary_string('\\xBE\\xBA\\xFE\\xCA'), 'INT')", 0xCAFEBABE);
+ verifyPhysicalPlan("convert_from(binary_string('\\xBE\\xBA\\xFE\\xCA'), 'INT')", 0xCAFEBABE);
+ }
+
+ @Test
+ public void testFixedInts4SQL_from() throws Throwable {
+ verifySQL("select"
+ + " convert_from(binary_string('\\xBE\\xBA\\xFE\\xCA'), 'INT')"
+ + " from"
+ + " cp.`employee.json` LIMIT 1",
+ 0xCAFEBABE);
+ }
+
+ @Test
+ public void testFixedInts4SQL_to() throws Throwable {
+ verifySQL("select"
+ + " convert_to(-889275714, 'INT')"
+ + " from"
+ + " cp.`employee.json` LIMIT 1",
+ new byte[] {(byte) 0xBE, (byte) 0xBA, (byte) 0xFE, (byte) 0xCA});
}
@Test
public void testFixedInts5() throws Throwable {
- runTest("convert_from(binary_string('\\xCA\\xFE\\xBA\\xBE'), 'INT_BE')", 0xCAFEBABE);
+ verifyPhysicalPlan("convert_from(binary_string('\\xCA\\xFE\\xBA\\xBE'), 'INT_BE')", 0xCAFEBABE);
}
@Test
public void testFixedInts6() throws Throwable {
- runTest("convert_from(binary_string('\\xEF\\xBE\\xAD\\xDE\\xBE\\xBA\\xFE\\xCA'), 'BIGINT')", 0xCAFEBABEDEADBEEFL);
+ verifyPhysicalPlan("convert_from(binary_string('\\xEF\\xBE\\xAD\\xDE\\xBE\\xBA\\xFE\\xCA'), 'BIGINT')", 0xCAFEBABEDEADBEEFL);
}
@Test
public void testFixedInts7() throws Throwable {
- runTest("convert_from(binary_string('\\xCA\\xFE\\xBA\\xBE\\xDE\\xAD\\xBE\\xEF'), 'BIGINT_BE')", 0xCAFEBABEDEADBEEFL);
+ verifyPhysicalPlan("convert_from(binary_string('\\xCA\\xFE\\xBA\\xBE\\xDE\\xAD\\xBE\\xEF'), 'BIGINT_BE')", 0xCAFEBABEDEADBEEFL);
}
@Test
public void testFixedInts8() throws Throwable {
- runTest("convert_from(convert_to(cast(77 as varchar(2)), 'INT_BE'), 'INT_BE')", 77);
+ verifyPhysicalPlan("convert_from(convert_to(cast(77 as varchar(2)), 'INT_BE'), 'INT_BE')", 77);
}
@Test
public void testFixedInts9() throws Throwable {
- runTest("convert_to(cast(77 as varchar(2)), 'INT_BE')", new byte[] {0, 0, 0, 77});
+ verifyPhysicalPlan("convert_to(cast(77 as varchar(2)), 'INT_BE')", new byte[] {0, 0, 0, 77});
}
@Test
public void testFixedInts10() throws Throwable {
- runTest("convert_to(cast(77 as varchar(2)), 'INT')", new byte[] {77, 0, 0, 0});
+ verifyPhysicalPlan("convert_to(cast(77 as varchar(2)), 'INT')", new byte[] {77, 0, 0, 0});
}
@Test
public void testFixedInts11() throws Throwable {
- runTest("convert_to(77, 'BIGINT_BE')", new byte[] {0, 0, 0, 0, 0, 0, 0, 77});
+ verifyPhysicalPlan("convert_to(77, 'BIGINT_BE')", new byte[] {0, 0, 0, 0, 0, 0, 0, 77});
}
@Test
public void testFixedInts12() throws Throwable {
- runTest("convert_to(9223372036854775807, 'BIGINT')", new byte[] {-1, -1, -1, -1, -1, -1, -1, 0x7f});
+ verifyPhysicalPlan("convert_to(9223372036854775807, 'BIGINT')", new byte[] {-1, -1, -1, -1, -1, -1, -1, 0x7f});
}
@Test
public void testFixedInts13() throws Throwable {
- runTest("convert_to(-9223372036854775808, 'BIGINT')", new byte[] {0, 0, 0, 0, 0, 0, 0, (byte)0x80});
+ verifyPhysicalPlan("convert_to(-9223372036854775808, 'BIGINT')", new byte[] {0, 0, 0, 0, 0, 0, 0, (byte)0x80});
}
@Test
public void testVInts1() throws Throwable {
- runTest("convert_to(cast(0 as int), 'INT_HADOOPV')", new byte[] {0});
+ verifyPhysicalPlan("convert_to(cast(0 as int), 'INT_HADOOPV')", new byte[] {0});
}
@Test
public void testVInts2() throws Throwable {
- runTest("convert_to(cast(128 as int), 'INT_HADOOPV')", new byte[] {-113, -128});
+ verifyPhysicalPlan("convert_to(cast(128 as int), 'INT_HADOOPV')", new byte[] {-113, -128});
}
@Test
public void testVInts3() throws Throwable {
- runTest("convert_to(cast(256 as int), 'INT_HADOOPV')", new byte[] {-114, 1, 0});
+ verifyPhysicalPlan("convert_to(cast(256 as int), 'INT_HADOOPV')", new byte[] {-114, 1, 0});
}
@Test
public void testVInts4() throws Throwable {
- runTest("convert_to(cast(65536 as int), 'INT_HADOOPV')", new byte[] {-115, 1, 0, 0});
+ verifyPhysicalPlan("convert_to(cast(65536 as int), 'INT_HADOOPV')", new byte[] {-115, 1, 0, 0});
}
@Test
public void testVInts5() throws Throwable {
- runTest("convert_to(cast(16777216 as int), 'INT_HADOOPV')", new byte[] {-116, 1, 0, 0, 0});
+ verifyPhysicalPlan("convert_to(cast(16777216 as int), 'INT_HADOOPV')", new byte[] {-116, 1, 0, 0, 0});
}
@Test
public void testVInts6() throws Throwable {
- runTest("convert_to(4294967296, 'BIGINT_HADOOPV')", new byte[] {-117, 1, 0, 0, 0, 0});
+ verifyPhysicalPlan("convert_to(4294967296, 'BIGINT_HADOOPV')", new byte[] {-117, 1, 0, 0, 0, 0});
}
@Test
public void testVInts7() throws Throwable {
- runTest("convert_to(1099511627776, 'BIGINT_HADOOPV')", new byte[] {-118, 1, 0, 0, 0, 0, 0});
+ verifyPhysicalPlan("convert_to(1099511627776, 'BIGINT_HADOOPV')", new byte[] {-118, 1, 0, 0, 0, 0, 0});
}
@Test
public void testVInts8() throws Throwable {
- runTest("convert_to(281474976710656, 'BIGINT_HADOOPV')", new byte[] {-119, 1, 0, 0, 0, 0, 0, 0});
+ verifyPhysicalPlan("convert_to(281474976710656, 'BIGINT_HADOOPV')", new byte[] {-119, 1, 0, 0, 0, 0, 0, 0});
}
@Test
public void testVInts9() throws Throwable {
- runTest("convert_to(72057594037927936, 'BIGINT_HADOOPV')", new byte[] {-120, 1, 0, 0, 0, 0, 0, 0, 0});
+ verifyPhysicalPlan("convert_to(72057594037927936, 'BIGINT_HADOOPV')", new byte[] {-120, 1, 0, 0, 0, 0, 0, 0, 0});
}
@Test
public void testVInts10() throws Throwable {
- runTest("convert_to(9223372036854775807, 'BIGINT_HADOOPV')", new byte[] {-120, 127, -1, -1, -1, -1, -1, -1, -1});
+ verifyPhysicalPlan("convert_to(9223372036854775807, 'BIGINT_HADOOPV')", new byte[] {-120, 127, -1, -1, -1, -1, -1, -1, -1});
}
@Test
public void testVInts11() throws Throwable {
- runTest("convert_from(binary_string('\\x88\\x7f\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF'), 'BIGINT_HADOOPV')", 9223372036854775807L);
+ verifyPhysicalPlan("convert_from(binary_string('\\x88\\x7f\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF'), 'BIGINT_HADOOPV')", 9223372036854775807L);
}
@Test
public void testVInts12() throws Throwable {
- runTest("convert_to(-9223372036854775808, 'BIGINT_HADOOPV')", new byte[] {-128, 127, -1, -1, -1, -1, -1, -1, -1});
+ verifyPhysicalPlan("convert_to(-9223372036854775808, 'BIGINT_HADOOPV')", new byte[] {-128, 127, -1, -1, -1, -1, -1, -1, -1});
}
@Test
public void testVInts13() throws Throwable {
- runTest("convert_from(binary_string('\\x80\\x7f\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF'), 'BIGINT_HADOOPV')", -9223372036854775808L);
+ verifyPhysicalPlan("convert_from(binary_string('\\x80\\x7f\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF'), 'BIGINT_HADOOPV')", -9223372036854775808L);
}
@Test
public void testBool1() throws Throwable {
- runTest("convert_from(binary_string('\\x01'), 'BOOLEAN_BYTE')", true);
+ verifyPhysicalPlan("convert_from(binary_string('\\x01'), 'BOOLEAN_BYTE')", true);
}
@Test
public void testBool2() throws Throwable {
- runTest("convert_from(binary_string('\\x00'), 'BOOLEAN_BYTE')", false);
+ verifyPhysicalPlan("convert_from(binary_string('\\x00'), 'BOOLEAN_BYTE')", false);
}
@Test
public void testBool3() throws Throwable {
- runTest("convert_to(true, 'BOOLEAN_BYTE')", new byte[] {1});
+ verifyPhysicalPlan("convert_to(true, 'BOOLEAN_BYTE')", new byte[] {1});
}
@Test
public void testBool4() throws Throwable {
- runTest("convert_to(false, 'BOOLEAN_BYTE')", new byte[] {0});
+ verifyPhysicalPlan("convert_to(false, 'BOOLEAN_BYTE')", new byte[] {0});
}
@Test
@@ -240,47 +258,47 @@ public class TestConvertFunctions extends BaseTestQuery {
@Test
public void testFloats2() throws Throwable {
- runTest("convert_from(convert_to(cast(77 as float4), 'FLOAT'), 'FLOAT')", new Float(77.0));
+ verifyPhysicalPlan("convert_from(convert_to(cast(77 as float4), 'FLOAT'), 'FLOAT')", new Float(77.0));
}
@Test
public void testFloats3() throws Throwable {
- runTest("convert_to(cast(1.4e-45 as float4), 'FLOAT')", new byte[] {1, 0, 0, 0});
+ verifyPhysicalPlan("convert_to(cast(1.4e-45 as float4), 'FLOAT')", new byte[] {1, 0, 0, 0});
}
@Test
public void testFloats4() throws Throwable {
- runTest("convert_to(cast(3.4028235e+38 as float4), 'FLOAT')", new byte[] {-1, -1, 127, 127});
+ verifyPhysicalPlan("convert_to(cast(3.4028235e+38 as float4), 'FLOAT')", new byte[] {-1, -1, 127, 127});
}
@Test
public void testFloats5(@Injectable final DrillbitContext bitContext,
@Injectable UserServer.UserClientConnection connection) throws Throwable {
- runTest("convert_from(convert_to(cast(77 as float8), 'DOUBLE'), 'DOUBLE')", 77.0);
+ verifyPhysicalPlan("convert_from(convert_to(cast(77 as float8), 'DOUBLE'), 'DOUBLE')", 77.0);
}
@Test
public void testFloats6(@Injectable final DrillbitContext bitContext,
@Injectable UserServer.UserClientConnection connection) throws Throwable {
- runTest("convert_to(cast(77 as float8), 'DOUBLE')", new byte[] {0, 0, 0, 0, 0, 64, 83, 64});
+ verifyPhysicalPlan("convert_to(cast(77 as float8), 'DOUBLE')", new byte[] {0, 0, 0, 0, 0, 64, 83, 64});
}
@Test
public void testFloats7(@Injectable final DrillbitContext bitContext,
@Injectable UserServer.UserClientConnection connection) throws Throwable {
- runTest("convert_to(4.9e-324, 'DOUBLE')", new byte[] {1, 0, 0, 0, 0, 0, 0, 0});
+ verifyPhysicalPlan("convert_to(4.9e-324, 'DOUBLE')", new byte[] {1, 0, 0, 0, 0, 0, 0, 0});
}
@Test
public void testFloats8(@Injectable final DrillbitContext bitContext,
@Injectable UserServer.UserClientConnection connection) throws Throwable {
- runTest("convert_to(1.7976931348623157e+308, 'DOUBLE')", new byte[] {-1, -1, -1, -1, -1, -1, -17, 127});
+ verifyPhysicalPlan("convert_to(1.7976931348623157e+308, 'DOUBLE')", new byte[] {-1, -1, -1, -1, -1, -1, -17, 127});
}
@Test
public void testUTF8() throws Throwable {
- runTest("convert_from(binary_string('apache_drill'), 'UTF8')", "apache_drill");
- runTest("convert_to('apache_drill', 'UTF8')", new byte[] {'a', 'p', 'a', 'c', 'h', 'e', '_', 'd', 'r', 'i', 'l', 'l'});
+ verifyPhysicalPlan("convert_from(binary_string('apache_drill'), 'UTF8')", "apache_drill");
+ verifyPhysicalPlan("convert_to('apache_drill', 'UTF8')", new byte[] {'a', 'p', 'a', 'c', 'h', 'e', '_', 'd', 'r', 'i', 'l', 'l'});
}
@Test
@@ -340,31 +358,27 @@ public class TestConvertFunctions extends BaseTestQuery {
assertEquals(intVal, Integer.MIN_VALUE);
}
- protected <T> void runTest(String expression, T expectedResults) throws Throwable {
- String testName = String.format("Expression: %s.", expression);
+ protected <T> void verifySQL(String sql, T expectedResults) throws Throwable {
+ verifyResults(sql, expectedResults, getRunResult(QueryType.SQL, sql));
+ }
+
+ protected <T> void verifyPhysicalPlan(String expression, T expectedResults) throws Throwable {
expression = expression.replace("\\", "\\\\\\\\"); // "\\\\\\\\" => Java => "\\\\" => JsonParser => "\\" => AntlrParser "\"
if (textFileContent == null) textFileContent = Resources.toString(Resources.getResource(CONVERSION_TEST_PHYSICAL_PLAN), Charsets.UTF_8);
String planString = textFileContent.replace("__CONVERT_EXPRESSION__", expression);
- Object[] results = getRunResult(planString);
- assertEquals(testName, 1, results.length);
- assertNotNull(testName, results[0]);
- if (expectedResults.getClass().isArray()) {
- assertArraysEquals(testName, expectedResults, results[0]);
- } else {
- assertEquals(testName, expectedResults, results[0]);
- }
+ verifyResults(expression, expectedResults, getRunResult(QueryType.PHYSICAL, planString));
}
- protected Object[] getRunResult(String planString) throws Exception {
+ protected Object[] getRunResult(QueryType queryType, String planString) throws Exception {
+ List<QueryResultBatch> resultList = testRunAndReturn(queryType, planString);
+
List<Object> res = new ArrayList<Object>();
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-
- List<QueryResultBatch> resultList = testPhysicalWithResults(planString);
for(QueryResultBatch result : resultList) {
- loader.load(result.getHeader().getDef(), result.getData());
- if (loader.getRecordCount() > 0) {
+ if (result.getData() != null) {
+ loader.load(result.getHeader().getDef(), result.getData());
ValueVector v = loader.iterator().next().getValueVector();
for (int j = 0; j < v.getAccessor().getValueCount(); j++) {
if (v instanceof VarCharVector) {
@@ -373,14 +387,25 @@ public class TestConvertFunctions extends BaseTestQuery {
res.add(v.getAccessor().getObject(j));
}
}
+ loader.clear();
+ result.release();
}
- loader.clear();
- result.release();
}
return res.toArray();
}
+ protected <T> void verifyResults(String expression, T expectedResults, Object[] actualResults) throws Throwable {
+ String testName = String.format("Expression: %s.", expression);
+ assertEquals(testName, 1, actualResults.length);
+ assertNotNull(testName, actualResults[0]);
+ if (expectedResults.getClass().isArray()) {
+ assertArraysEquals(testName, expectedResults, actualResults[0]);
+ } else {
+ assertEquals(testName, expectedResults, actualResults[0]);
+ }
+ }
+
protected void assertArraysEquals(Object expected, Object actual) {
assertArraysEquals(null, expected, actual);
}
[04/14] git commit: DRILL-769: Remove swapping using decimal holders.
Posted by ja...@apache.org.
DRILL-769: Remove swapping using decimal holders.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5a78ff8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5a78ff8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5a78ff8c
Branch: refs/heads/master
Commit: 5a78ff8c546b80b0f747c558688786714eb09b20
Parents: 828a5c6
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 18 16:54:39 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Mon May 19 10:43:14 2014 -0700
----------------------------------------------------------------------
.../templates/Decimal/DecimalFunctions.java | 78 ++++++++++----------
.../main/codegen/templates/ValueHolders.java | 27 -------
.../drill/jdbc/test/TestFunctionsQuery.java | 13 ++++
3 files changed, 50 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a78ff8c/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java b/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
index 6cd6ade..8f14e83 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
@@ -31,26 +31,26 @@ import org.apache.drill.exec.expr.annotations.Workspace;
}
</#macro>
-<#macro subtractBlock holderType left right result>
+<#macro subtractBlock holderType in1 in2 result>
/* compute the result's size, integer part and fractional part */
- result.scale = Math.max(left.scale, right.scale);
+ result.scale = Math.max(${in1}.scale, ${in2}.scale);
result.precision = result.maxPrecision;
int resultScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(result.scale);
int resultIndex = result.nDecimalDigits- 1;
- int leftScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(left.scale);
- int leftIntRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(left.precision - left.scale);
- int rightScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(right.scale);
+ int leftScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(${in1}.scale);
+ int leftIntRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(${in1}.precision - ${in1}.scale);
+ int rightScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(${in2}.scale);
- int leftIndex = left.nDecimalDigits - 1;
- int rightIndex = right.nDecimalDigits - 1;
+ int leftIndex = ${in1}.nDecimalDigits - 1;
+ int rightIndex = ${in2}.nDecimalDigits - 1;
/* If the left scale is bigger, simply copy over the digits into result */
while (leftScaleRoundedUp > rightScaleRoundedUp) {
- result.setInteger(resultIndex, left.getInteger(leftIndex));
+ result.setInteger(resultIndex, ${in1}.getInteger(leftIndex));
leftIndex--;
resultIndex--;
leftScaleRoundedUp--;
@@ -60,7 +60,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
int carry = 0;
while(rightScaleRoundedUp > leftScaleRoundedUp) {
- int difference = 0 - right.getInteger(rightIndex) - carry;
+ int difference = 0 - ${in2}.getInteger(rightIndex) - carry;
rightIndex--;
if (difference < 0) {
@@ -80,7 +80,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
*/
while (leftScaleRoundedUp > 0) {
- int difference = left.getInteger(leftIndex) - right.getInteger(rightIndex) - carry;
+ int difference = ${in1}.getInteger(leftIndex) - ${in2}.getInteger(rightIndex) - carry;
leftIndex--;
rightIndex--;
@@ -100,11 +100,11 @@ import org.apache.drill.exec.expr.annotations.Workspace;
*/
while(leftIntRoundedUp > 0) {
- int difference = left.getInteger(leftIndex);
+ int difference = ${in1}.getInteger(leftIndex);
leftIndex--;
if (rightIndex >= 0) {
- difference -= right.getInteger(rightIndex);
+ difference -= ${in2}.getInteger(rightIndex);
rightIndex--;
}
@@ -123,20 +123,20 @@ import org.apache.drill.exec.expr.annotations.Workspace;
</#macro>
-<#macro addBlock holderType left right result>
+<#macro addBlock holderType in1 in2 result>
/* compute the result scale */
- result.scale = Math.max(left.scale, right.scale);
+ result.scale = Math.max(${in1}.scale, ${in2}.scale);
result.precision = result.maxPrecision;
int resultScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(result.scale);
- int leftScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(left.scale);
- int rightScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(right.scale);
+ int leftScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(${in1}.scale);
+ int rightScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(${in2}.scale);
/* starting index for each decimal */
- int leftIndex = left.nDecimalDigits - 1;
- int rightIndex = right.nDecimalDigits - 1;
+ int leftIndex = ${in1}.nDecimalDigits - 1;
+ int rightIndex = ${in2}.nDecimalDigits - 1;
int resultIndex = result.nDecimalDigits - 1;
/* If one of the scale is larger then simply copy it over
@@ -144,7 +144,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
*/
while (leftScaleRoundedUp > rightScaleRoundedUp) {
- result.setInteger(resultIndex, left.getInteger(leftIndex));
+ result.setInteger(resultIndex, ${in1}.getInteger(leftIndex));
leftIndex--;
resultIndex--;
leftScaleRoundedUp--;
@@ -152,7 +152,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
}
while (rightScaleRoundedUp > leftScaleRoundedUp) {
- result.setInteger((resultIndex), right.getInteger(rightIndex));
+ result.setInteger((resultIndex), ${in2}.getInteger(rightIndex));
rightIndex--;
resultIndex--;
rightScaleRoundedUp--;
@@ -164,7 +164,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
/* now the two scales are at the same level, we can add them */
while (resultScaleRoundedUp > 0) {
- sum += left.getInteger(leftIndex) + right.getInteger(rightIndex);
+ sum += ${in1}.getInteger(leftIndex) + ${in2}.getInteger(rightIndex);
leftIndex--;
rightIndex--;
@@ -182,7 +182,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
/* add the integer part */
while (leftIndex >= 0 && rightIndex >= 0) {
- sum += left.getInteger(leftIndex) + right.getInteger(rightIndex);
+ sum += ${in1}.getInteger(leftIndex) + ${in2}.getInteger(rightIndex);
leftIndex--;
rightIndex--;
@@ -197,7 +197,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
}
while (resultIndex >= 0 && leftIndex >= 0) {
- sum += left.getInteger(leftIndex);
+ sum += ${in1}.getInteger(leftIndex);
leftIndex--;
if (sum >= org.apache.drill.common.util.DecimalUtility.DIGITS_BASE) {
@@ -210,7 +210,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
}
while (resultIndex >= 0 && rightIndex >= 0) {
- sum += right.getInteger(rightIndex);
+ sum += ${in2}.getInteger(rightIndex);
rightIndex--;
if (sum >= org.apache.drill.common.util.DecimalUtility.DIGITS_BASE) {
@@ -294,7 +294,7 @@ public class ${type.name}Functions {
* becomes addition
*/
if (left.sign != right.sign) {
- <@addBlock holderType=type.name left="left" right="right" result="result"/>
+ <@addBlock holderType=type.name in1="left" in2="right" result="result"/>
result.sign = left.sign;
} else {
/* Sign of the inputs are the same, meaning we have to perform subtraction
@@ -305,7 +305,9 @@ public class ${type.name}Functions {
<@compareBlock holderType=type.name left="left" right="right" absCompare="true" output="cmp"/>
if (cmp == -1) {
- left.swap(right);
+ <@subtractBlock holderType=type.name in1="right" in2="left" result="result"/>
+ } else {
+ <@subtractBlock holderType=type.name in1="left" in2="right" result="result"/>
}
//Determine the sign of the result
@@ -314,10 +316,6 @@ public class ${type.name}Functions {
} else {
result.sign = false;
}
-
- // Perform the subtraction
- <@subtractBlock holderType=type.name left="left" right="right" result="result"/>
-
}
}
@@ -356,21 +354,19 @@ public class ${type.name}Functions {
<@compareBlock holderType=type.name left="left" right="right" absCompare="true" output="cmp"/>
if (cmp == -1) {
- left.swap(right);
+ <@subtractBlock holderType=type.name in1="right" in2="left" result="result"/>
+ result.sign = right.sign;
+ } else {
+ <@subtractBlock holderType=type.name in1="left" in2="right" result="result"/>
+ result.sign = left.sign;
}
- /* Perform the subtraction */
- <@subtractBlock holderType=type.name left="left" right="right" result="result"/>
+
+
} else {
/* Sign of the two input decimals is the same, use the add logic */
- <@addBlock holderType=type.name left="left" right="right" result="result"/>
+ <@addBlock holderType=type.name in1="left" in2="right" result="result"/>
+ result.sign = left.sign;
}
-
- /* Assign the result to be the sign of the left input
- * If the two input signs are the same, we can choose either to be the resulting sign
- * If the two input signs are different, we assign left input to be the greater absolute value
- * hence result will have the same sign as left
- */
- result.sign = left.sign;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a78ff8c/exec/java-exec/src/main/codegen/templates/ValueHolders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ValueHolders.java b/exec/java-exec/src/main/codegen/templates/ValueHolders.java
index 7272f4f..5cea7e3 100644
--- a/exec/java-exec/src/main/codegen/templates/ValueHolders.java
+++ b/exec/java-exec/src/main/codegen/templates/ValueHolders.java
@@ -82,33 +82,6 @@ public final class ${className} implements ValueHolder{
buffer.setInt(start + (index * 4), value);
}
- // TODO: This is a temporary hack to swap holders. We need a generic solution for this issue
- public void swap(${className} right) {
- int tempScale = this.scale;
- int tempPrec = this.precision;
- boolean tempSign = this.sign;
- ByteBuf tempBuf = this.buffer;
- int start = this.start;
-
- this.scale = right.scale;
- this.precision = right.precision;
- this.sign = right.sign;
- this.buffer = right.buffer;
- this.start = right.start;
-
- right.scale = tempScale;
- right.precision = tempPrec;
- right.sign = tempSign;
- right.buffer = tempBuf;
- right.start = start;
-
- <#if mode.prefix == "Nullable">
- int isSet = this.isSet;
- this.isSet = right.isSet;
- right.isSet = isSet;
- </#if>
- }
-
<#else>
public ${minor.javaType!type.javaType} value;
</#if>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5a78ff8c/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
index 78bcd95..05884e5 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
@@ -439,4 +439,17 @@ public class TestFunctionsQuery {
JdbcAssert.withNoDefaultSchema()
.sql(query);
}
+
+ @Test
+ public void testDecimalAddConstant() throws Exception {
+ String query = "select (cast('-1' as decimal(38, 3)) + cast (employee_id as decimal(38, 3))) as CNT " +
+ "from cp.`employee.json` where employee_id <= 4";
+
+ JdbcAssert.withNoDefaultSchema()
+ .sql(query)
+ .returns(
+ "CNT=0.000\n" +
+ "CNT=1.000\n" +
+ "CNT=3.000\n");
+ }
}
[13/14] git commit: DRILL-783: Convert function support in HBase
filter push down.
Posted by ja...@apache.org.
DRILL-783: Convert function support in HBase filter push down.
+ Enable HBase test suit (failures fixed by DRILL-761).
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e9e63c4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e9e63c4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e9e63c4a
Branch: refs/heads/master
Commit: e9e63c4a7f3ac354dc346799f19bdccc56d8bd0e
Parents: 6d4dc8f
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Fri May 16 17:48:55 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 18:06:32 2014 -0700
----------------------------------------------------------------------
.../common/expression/ConvertExpression.java | 23 +-
.../expression/ExpressionStringBuilder.java | 2 +-
.../store/hbase/CompareFunctionsProcessor.java | 232 +++++++++++++++++++
.../exec/store/hbase/HBaseFilterBuilder.java | 74 ++----
.../org/apache/drill/hbase/HBaseTestsSuite.java | 1 -
.../drill/hbase/TestHBaseFilterPushDown.java | 9 +-
.../drill/exec/expr/EvaluationVisitor.java | 2 +-
.../exec/expr/ExpressionTreeMaterializer.java | 2 +-
8 files changed, 278 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/common/src/main/java/org/apache/drill/common/expression/ConvertExpression.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/expression/ConvertExpression.java b/common/src/main/java/org/apache/drill/common/expression/ConvertExpression.java
index c028083..9debd15 100644
--- a/common/src/main/java/org/apache/drill/common/expression/ConvertExpression.java
+++ b/common/src/main/java/org/apache/drill/common/expression/ConvertExpression.java
@@ -28,23 +28,26 @@ public class ConvertExpression extends LogicalExpressionBase implements Iterable
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConvertExpression.class);
+ public static final String CONVERT_FROM = "convert_from";
+ public static final String CONVERT_TO = "convert_to";
+
private final LogicalExpression input;
private final MajorType type;
private final String convertFunction;
- private final String conversionType;
+ private final String encodingType;
/**
- * @param conversionType
+ * @param encodingType
* @param convertFunction
* @param input
* @param pos
*/
- public ConvertExpression(String convertFunction, String conversionType, LogicalExpression input, ExpressionPosition pos) {
+ public ConvertExpression(String convertFunction, String encodingType, LogicalExpression input, ExpressionPosition pos) {
super(pos);
this.input = input;
- this.convertFunction = convertFunction.toLowerCase();
- this.conversionType = conversionType.toUpperCase();
- this.type = Types.getMajorTypeFromName(conversionType.split("_", 2)[0].toLowerCase());
+ this.convertFunction = CONVERT_FROM.equals(convertFunction.toLowerCase()) ? CONVERT_FROM : CONVERT_TO;
+ this.encodingType = encodingType.toUpperCase();
+ this.type = Types.getMajorTypeFromName(encodingType.split("_", 2)[0].toLowerCase());
}
@Override
@@ -70,13 +73,13 @@ public class ConvertExpression extends LogicalExpressionBase implements Iterable
return type;
}
- public String getConversionType() {
- return conversionType;
+ public String getEncodingType() {
+ return encodingType;
}
@Override
public String toString() {
- return "ConvertExpression [input=" + input + ", type=" + type + ", convertFunction="
- + convertFunction + ", conversionType=" + conversionType + "]";
+ return "ConvertExpression [input=" + input + ", type=" + Types.toString(type) + ", convertFunction="
+ + convertFunction + ", conversionType=" + encodingType + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/common/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java b/common/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java
index 9588863..9301528 100644
--- a/common/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java
+++ b/common/src/main/java/org/apache/drill/common/expression/ExpressionStringBuilder.java
@@ -226,7 +226,7 @@ public class ExpressionStringBuilder extends AbstractExprVisitor<Void, StringBui
public Void visitConvertExpression(ConvertExpression e, StringBuilder sb) throws RuntimeException {
sb.append(e.getConvertFunction()).append("(");
e.getInput().accept(this, sb);
- sb.append(", \"").append(e.getConversionType()).append("\")");
+ sb.append(", \"").append(e.getEncodingType()).append("\")");
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
new file mode 100644
index 0000000..6810f81
--- /dev/null
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/CompareFunctionsProcessor.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hbase;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.nio.ByteOrder;
+
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.ConvertExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
+ private byte[] value;
+ private boolean success;
+ private boolean isEqualityFn;
+ private SchemaPath path;
+ private String functionName;
+
+ public static boolean isCompareFunction(String functionName) {
+ return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+ }
+
+ public static CompareFunctionsProcessor process(FunctionCall call, boolean nullComparatorSupported) {
+ String functionName = call.getName();
+ LogicalExpression nameArg = call.args.get(0);
+ LogicalExpression valueArg = call.args.size() == 2 ? call.args.get(1) : null;
+ CompareFunctionsProcessor evaluator = new CompareFunctionsProcessor(functionName);
+
+ if (valueArg != null) { // binary function
+ if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+ LogicalExpression swapArg = valueArg;
+ valueArg = nameArg;
+ nameArg = swapArg;
+ evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+ }
+ evaluator.success = nameArg.accept(evaluator, valueArg);
+ } else if (nullComparatorSupported && call.args.get(0) instanceof SchemaPath) {
+ evaluator.success = true;
+ evaluator.path = (SchemaPath) nameArg;
+ }
+
+ return evaluator;
+ }
+
+ public CompareFunctionsProcessor(String functionName) {
+ this.success = false;
+ this.functionName = functionName;
+ this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)
+ && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(functionName);
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public SchemaPath getPath() {
+ return path;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ @Override
+ public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException {
+ if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) {
+ return e.getInput().accept(this, valueArg);
+ }
+ return false;
+ }
+
+ @Override
+ public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression valueArg) throws RuntimeException {
+ if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM && e.getInput() instanceof SchemaPath) {
+ ByteBuf bb = null;
+ String encodingType = e.getEncodingType();
+ switch (encodingType) {
+ case "INT_BE":
+ case "INT":
+ case "UINT_BE":
+ case "UINT":
+ case "UINT4_BE":
+ case "UINT4":
+ if (valueArg instanceof IntExpression
+ && (isEqualityFn || encodingType.startsWith("U"))) {
+ bb = Unpooled.wrappedBuffer(new byte[4]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+ bb.writeInt(((IntExpression)valueArg).getInt());
+ }
+ break;
+ case "BIGINT_BE":
+ case "BIGINT":
+ case "UINT8_BE":
+ case "UINT8":
+ if (valueArg instanceof LongExpression
+ && (isEqualityFn || encodingType.startsWith("U"))) {
+ bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+ bb.writeLong(((LongExpression)valueArg).getLong());
+ }
+ break;
+ case "FLOAT":
+ if (valueArg instanceof FloatExpression && isEqualityFn) {
+ bb = Unpooled.wrappedBuffer(new byte[4]).order(ByteOrder.BIG_ENDIAN);
+ bb.writeFloat(((FloatExpression)valueArg).getFloat());
+ }
+ break;
+ case "DOUBLE":
+ if (valueArg instanceof DoubleExpression && isEqualityFn) {
+ bb = Unpooled.wrappedBuffer(new byte[8]).order(ByteOrder.BIG_ENDIAN);;
+ bb.writeDouble(((DoubleExpression)valueArg).getDouble());
+ }
+ break;
+ case "TIME_EPOCH":
+ case "TIME_EPOCH_BE":
+ if (valueArg instanceof TimeExpression) {
+ bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+ bb.writeLong(((TimeExpression)valueArg).getTime());
+ }
+ break;
+ case "DATE_EPOCH":
+ case "DATE_EPOCH_BE":
+ if (valueArg instanceof DateExpression) {
+ bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+ bb.writeLong(((DateExpression)valueArg).getDate());
+ }
+ break;
+ case "BOOLEAN_BYTE":
+ if (valueArg instanceof BooleanExpression) {
+ bb = Unpooled.wrappedBuffer(new byte[1]);
+ bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0);
+ }
+ break;
+ case "UTF8":
+ // let visitSchemaPath() handle this.
+ return e.getInput().accept(this, valueArg);
+ }
+
+ if (bb != null) {
+ this.value = bb.array();
+ this.path = (SchemaPath)e.getInput();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
+ return false;
+ }
+
+ @Override
+ public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
+ if (valueArg instanceof QuotedString) {
+ this.value = ((QuotedString) valueArg).value.getBytes();
+ this.path = path;
+ return true;
+ }
+ return false;
+ }
+
+ private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
+ static {
+ ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
+ VALUE_EXPRESSION_CLASSES = builder
+ .add(BooleanExpression.class)
+ .add(DateExpression.class)
+ .add(DoubleExpression.class)
+ .add(FloatExpression.class)
+ .add(IntExpression.class)
+ .add(LongExpression.class)
+ .add(QuotedString.class)
+ .add(TimeExpression.class)
+ .build();
+ }
+
+ private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+ static {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+ // unary functions
+ .put("isnotnull", "isnotnull")
+ .put("isNotNull", "isNotNull")
+ .put("is not null", "is not null")
+ .put("isnull", "isnull")
+ .put("isNull", "isNull")
+ .put("is null", "is null")
+ // binary functions
+ .put("equal", "equal")
+ .put("not_equal", "not_equal")
+ .put("greater_than_or_equal_to", "less_than_or_equal_to")
+ .put("greater_than", "less_than")
+ .put("less_than_or_equal_to", "greater_than_or_equal_to")
+ .put("less_than", "greater_than")
+ .build();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
index 924cd6e..ad26972 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
@@ -19,13 +19,10 @@ package org.apache.drill.exec.store.hbase;
import java.util.Arrays;
-import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.ValueExpressions.QuotedString;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -36,8 +33,6 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
@@ -54,7 +49,17 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
public HBaseScanSpec parseTree() {
HBaseScanSpec parsedSpec = le.accept(this, null);
- return parsedSpec != null ? mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), parsedSpec ) : null;
+ if (parsedSpec != null) {
+ parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), parsedSpec);
+ /*
+ * If RowFilter is THE filter attached to the scan specification,
+ * remove it since its effect is also achieved through startRow and stopRow.
+ */
+ if (parsedSpec.filter instanceof RowFilter) {
+ parsedSpec.filter = null;
+ }
+ }
+ return parsedSpec;
}
public boolean isAllExpressionsConverted() {
@@ -72,22 +77,18 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
HBaseScanSpec nodeScanSpec = null;
String functionName = call.getName();
ImmutableList<LogicalExpression> args = call.args;
- if (COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)) {
- LogicalExpression nameArg = args.get(0);
- LogicalExpression valueArg = args.get(1);
- if (nameArg instanceof QuotedString) {
- valueArg = nameArg;
- nameArg = args.get(1);
- functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
- }
- while (nameArg instanceof CastExpression
- && nameArg.getMajorType().getMinorType() == MinorType.VARCHAR) {
- nameArg = ((CastExpression) nameArg).getInput();
- }
+ if (CompareFunctionsProcessor.isCompareFunction(functionName)) {
+ /*
+ * HBASE-10848: Bug in HBase versions (0.94.[0-18], 0.96.[0-2], 0.98.[0-1])
+ * causes a filter with NullComparator to fail. Enable only if specified in
+ * the configuration (after ensuring that the HBase cluster has the fix).
+ */
+ boolean nullComparatorSupported = groupScan.getHBaseConf().getBoolean("drill.hbase.supports.null.comparator", false);
- if (nameArg instanceof SchemaPath && valueArg instanceof QuotedString) {
- nodeScanSpec = createHBaseScanSpec(functionName, (SchemaPath) nameArg, ((QuotedString) valueArg).value.getBytes());
+ CompareFunctionsProcessor processor = CompareFunctionsProcessor.process(call, nullComparatorSupported);
+ if (processor.isSuccess()) {
+ nodeScanSpec = createHBaseScanSpec(processor.getFunctionName(), processor.getPath(), processor.getValue());
}
} else {
switch (functionName) {
@@ -104,28 +105,13 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
}
}
break;
- case "isnotnull":
- case "isNotNull":
- case "is not null":
- case "isnull":
- case "isNull":
- case "is null":
- /*
- * HBASE-10848: Bug in HBase versions (0.94.[0-18], 0.96.[0-2], 0.98.[0-1])
- * causes a filter with NullComparator to fail. Enable only if specified in
- * the configuration (after ensuring that the HBase cluster has the fix).
- */
- if (groupScan.getHBaseConf().getBoolean("drill.hbase.supports.null.comparator", false)) {
- if (args.get(0) instanceof SchemaPath) {
- nodeScanSpec = createHBaseScanSpec(functionName, ((SchemaPath) args.get(0)), null);
- }
- }
}
}
if (nodeScanSpec == null) {
allExpressionsConverted = false;
}
+
return nodeScanSpec;
}
@@ -182,7 +168,8 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
case "greater_than":
compareOp = CompareOp.GREATER;
if (isRowKey) {
- startRow = fieldValue;
+ // startRow should be just greater than 'value'
+ startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
}
break;
case "less_than_or_equal_to":
@@ -240,17 +227,4 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
return null;
}
- private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
- static {
- Builder<String, String> builder = ImmutableMap.builder();
- COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
- .put("equal", "equal")
- .put("not_equal", "not_equal")
- .put("greater_than_or_equal_to", "less_than_or_equal_to")
- .put("greater_than", "less_than")
- .put("less_than_or_equal_to", "greater_than_or_equal_to")
- .put("less_than", "greater_than")
- .build();
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index 3e91361..3881f4d 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -33,7 +33,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
-@Ignore("Need to fix HBaseRecordReader")
@RunWith(Suite.class)
@SuiteClasses({
HBaseRecordReaderTest.class,
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 6dd418c..76300b7 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -17,6 +17,10 @@
*/
package org.apache.drill.hbase;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+
import org.junit.Ignore;
import org.junit.Test;
@@ -56,15 +60,14 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
}
@Test
- @Ignore("Until convert_from() functions are working.")
public void testFilterPushDownConvertExpression() throws Exception {
runSQLVerifyCount("SELECT\n"
+ " *\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName\n"
+ "WHERE\n"
- + " convert_from(row_key, 'INT_BE') > 12"
- , -1);
+ + " convert_from(row_key, 'UTF8') > 'b4'"
+ , 2);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 731ab6b..b03882e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -541,7 +541,7 @@ public class EvaluationVisitor {
@Override
public HoldingContainer visitConvertExpression(ConvertExpression e, ClassGenerator<?> value) throws RuntimeException {
- String convertFunctionName = e.getConvertFunction() + e.getConversionType();
+ String convertFunctionName = e.getConvertFunction() + e.getEncodingType();
List<LogicalExpression> newArgs = Lists.newArrayList();
newArgs.add(e.getInput()); //input_expr
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9e63c4a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index 0267be3..6297148 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -326,7 +326,7 @@ public class ExpressionTreeMaterializer {
@Override
public LogicalExpression visitConvertExpression(ConvertExpression e, FunctionImplementationRegistry value) {
- String convertFunctionName = e.getConvertFunction() + e.getConversionType();
+ String convertFunctionName = e.getConvertFunction() + e.getEncodingType();
List<LogicalExpression> newArgs = Lists.newArrayList();
newArgs.add(e.getInput()); //input_expr
[14/14] git commit: DRILL-781: Use MapVector as the top level vector
for HBase Column Families
Posted by ja...@apache.org.
DRILL-781: Use MapVector as the top level vector for HBase Column Families
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5d7e3d3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5d7e3d3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5d7e3d3a
Branch: refs/heads/master
Commit: 5d7e3d3ab548eb2b23607df46ea843a9c1532b72
Parents: e9e63c4
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Mon May 19 15:28:09 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 18:06:36 2014 -0700
----------------------------------------------------------------------
.../exec/store/hbase/HBaseRecordReader.java | 160 ++++++-------------
.../exec/store/hbase/HBaseSchemaFactory.java | 1 -
.../org/apache/drill/hbase/HBaseTestsSuite.java | 1 -
.../drill/hbase/TestHBaseFilterPushDown.java | 16 +-
.../drill/hbase/TestHBaseProjectPushDown.java | 7 +-
.../drill/exec/record/MaterializedField.java | 3 +-
6 files changed, 69 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index ae9f833..439f97f 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.hbase;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -40,6 +41,7 @@ import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.NullableVarBinaryVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
@@ -50,17 +52,17 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
private static final int TARGET_RECORD_COUNT = 4000;
- private List<SchemaPath> columns;
+ private LinkedHashSet<SchemaPath> columns;
private OutputMutator outputMutator;
- private Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap;
+ private Map<String, MapVector> familyVectorMap;
private VarBinaryVector rowKeyVector;
private SchemaPath rowKeySchemaPath;
@@ -78,34 +80,29 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
hbaseTable = subScanSpec.getTableName();
hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
boolean rowKeyOnly = true;
+ this.columns = Sets.newLinkedHashSet();
if (projectedColumns != null && projectedColumns.size() != 0) {
- /*
- * This will change once the non-scaler value vectors are available.
- * Then, each column family will have a single top level value vector
- * and each column will be an item vector in its corresponding TLV.
- */
- this.columns = Lists.newArrayList(projectedColumns);
- Iterator<SchemaPath> columnIterator = columns.iterator();
+ Iterator<SchemaPath> columnIterator = projectedColumns.iterator();
while(columnIterator.hasNext()) {
SchemaPath column = columnIterator.next();
- if (column.getRootSegment().getPath().toString().equalsIgnoreCase(ROW_KEY)) {
+ if (column.getRootSegment().getPath().equalsIgnoreCase(ROW_KEY)) {
rowKeySchemaPath = ROW_KEY_PATH;
+ this.columns.add(rowKeySchemaPath);
continue;
}
rowKeyOnly = false;
NameSegment root = column.getRootSegment();
- byte[] family = root.getPath().toString().getBytes();
+ byte[] family = root.getPath().getBytes();
+ this.columns.add(SchemaPath.getSimplePath(root.getPath()));
PathSegment child = root.getChild();
if (child != null && child.isNamed()) {
- byte[] qualifier = child.getNameSegment().getPath().toString().getBytes();
+ byte[] qualifier = child.getNameSegment().getPath().getBytes();
hbaseScan.addColumn(family, qualifier);
} else {
- columnIterator.remove();
hbaseScan.addFamily(family);
}
}
} else {
- this.columns = Lists.newArrayList();
rowKeyOnly = false;
rowKeySchemaPath = ROW_KEY_PATH;
this.columns.add(rowKeySchemaPath);
@@ -128,16 +125,16 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
@Override
public void setup(OutputMutator output) throws ExecutionSetupException {
this.outputMutator = output;
- vvMap = new HashMap<FamilyQualifierWrapper, NullableVarBinaryVector>();
+ familyVectorMap = new HashMap<String, MapVector>();
try {
// Add Vectors to output in the order specified when creating reader
for (SchemaPath column : columns) {
if (column.equals(rowKeySchemaPath)) {
MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.VARBINARY));
- rowKeyVector = output.addField(field, VarBinaryVector.class);
- } else if (column.getRootSegment().getChild() != null) {
- getOrCreateColumnVector(new FamilyQualifierWrapper(column), false);
+ rowKeyVector = outputMutator.addField(field, VarBinaryVector.class);
+ } else {
+ getOrCreateFamilyVector(column.getRootSegment().getPath(), false);
}
}
logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.",
@@ -158,12 +155,14 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
rowKeyVector.clear();
rowKeyVector.allocateNew();
}
- for (ValueVector v : vvMap.values()) {
+ for (ValueVector v : familyVectorMap.values()) {
v.clear();
v.allocateNew();
}
- for (int count = 0; count < TARGET_RECORD_COUNT; count++) {
+ int rowCount = 0;
+ done:
+ for (; rowCount < TARGET_RECORD_COUNT; rowCount++) {
Result result = null;
try {
if (leftOver != null) {
@@ -176,54 +175,54 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
throw new DrillRuntimeException(e);
}
if (result == null) {
- setOutputValueCount(count);
- logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), count);
- return count;
+ break done;
}
// parse the result and populate the value vectors
KeyValue[] kvs = result.raw();
byte[] bytes = result.getBytes().get();
if (rowKeyVector != null) {
- if (!rowKeyVector.getMutator().setSafe(count, bytes, kvs[0].getRowOffset(), kvs[0].getRowLength())) {
- setOutputValueCount(count);
+ if (!rowKeyVector.getMutator().setSafe(rowCount, bytes, kvs[0].getRowOffset(), kvs[0].getRowLength())) {
leftOver = result;
- logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), count);
- return count;
+ break done;
}
}
+
for (KeyValue kv : kvs) {
int familyOffset = kv.getFamilyOffset();
int familyLength = kv.getFamilyLength();
+ MapVector mv = getOrCreateFamilyVector(new String(bytes, familyOffset, familyLength), true);
+
int qualifierOffset = kv.getQualifierOffset();
int qualifierLength = kv.getQualifierLength();
+ NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(bytes, qualifierOffset, qualifierLength));
+
int valueOffset = kv.getValueOffset();
int valueLength = kv.getValueLength();
- NullableVarBinaryVector v = getOrCreateColumnVector(
- new FamilyQualifierWrapper(bytes, familyOffset, familyLength, qualifierOffset, qualifierLength), true);
- if (!v.getMutator().setSafe(count, bytes, valueOffset, valueLength)) {
- setOutputValueCount(count);
+ if (!v.getMutator().setSafe(rowCount, bytes, valueOffset, valueLength)) {
leftOver = result;
- logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), count);
- return count;
+ return rowCount;
}
}
}
- setOutputValueCount(TARGET_RECORD_COUNT);
- logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), TARGET_RECORD_COUNT);
- return TARGET_RECORD_COUNT;
+
+ setOutputRowCount(rowCount);
+ logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
+ return rowCount;
}
- private NullableVarBinaryVector getOrCreateColumnVector(FamilyQualifierWrapper column, boolean allocateOnCreate) {
+ private MapVector getOrCreateFamilyVector(String familyName, boolean allocateOnCreate) {
try {
- NullableVarBinaryVector v = vvMap.get(column);
+ MapVector v = familyVectorMap.get(familyName);
if(v == null) {
- MaterializedField field = MaterializedField.create(column.asSchemaPath(), Types.optional(TypeProtos.MinorType.VARBINARY));
- v = outputMutator.addField(field, NullableVarBinaryVector.class);
+ SchemaPath column = SchemaPath.getSimplePath(familyName);
+ MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.MAP));
+ v = outputMutator.addField(field, MapVector.class);
if (allocateOnCreate) {
v.allocateNew();
}
- vvMap.put(column, v);
+ columns.add(column);
+ familyVectorMap.put(familyName, v);
}
return v;
} catch (SchemaChangeException e) {
@@ -231,6 +230,15 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
}
}
+ private NullableVarBinaryVector getOrCreateColumnVector(MapVector mv, String qualifier) {
+ int oldSize = mv.size();
+ NullableVarBinaryVector v = mv.addOrGet(qualifier, Types.optional(TypeProtos.MinorType.VARBINARY), NullableVarBinaryVector.class);
+ if (oldSize != mv.size()) {
+ v.allocateNew();
+ }
+ return v;
+ }
+
@Override
public void cleanup() {
try {
@@ -245,8 +253,8 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
}
}
- private void setOutputValueCount(int count) {
- for (ValueVector vv : vvMap.values()) {
+ private void setOutputRowCount(int count) {
+ for (ValueVector vv : familyVectorMap.values()) {
vv.getMutator().setValueCount(count);
}
if (rowKeyVector != null) {
@@ -254,68 +262,4 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
}
}
- private static class FamilyQualifierWrapper implements Comparable<FamilyQualifierWrapper> {
- int hashCode;
- protected String stringVal;
- protected String family;
- protected String qualifier;
-
- public FamilyQualifierWrapper(SchemaPath column) {
- this(column.getRootSegment().getPath(), column.getRootSegment().getChild().getNameSegment().getPath());
- }
-
- public FamilyQualifierWrapper(byte[] bytes, int familyOffset, int familyLength, int qualifierOffset, int qualifierLength) {
- this(new String(bytes, familyOffset, familyLength), new String(bytes, qualifierOffset, qualifierLength));
- }
-
- public FamilyQualifierWrapper(String family, String qualifier) {
- this.family = family;
- this.qualifier = qualifier;
- hashCode = 31*family.hashCode() + qualifier.hashCode();
- }
-
- @Override
- public int hashCode() {
- return this.hashCode;
- }
-
- @Override
- public boolean equals(Object anObject) {
- if (this == anObject) {
- return true;
- }
- if (anObject instanceof FamilyQualifierWrapper) {
- FamilyQualifierWrapper that = (FamilyQualifierWrapper) anObject;
- // we compare qualifier first since many columns will have same family
- if (!qualifier.equals(that.qualifier)) {
- return false;
- }
- return family.equals(that.family);
- }
- return false;
- }
-
- @Override
- public String toString() {
- if (stringVal == null) {
- stringVal = new StringBuilder().append(new String(family)).append(".").append(new String(qualifier)).toString();
- }
- return stringVal;
- }
-
- public SchemaPath asSchemaPath() {
- return SchemaPath.getCompoundPath(family, qualifier);
- }
-
- @Override
- public int compareTo(FamilyQualifierWrapper o) {
- int val = family.compareTo(o.family);
- if (val != 0) {
- return val;
- }
- return qualifier.compareTo(o.qualifier);
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index ce3b9fd..84f363b 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.SchemaFactory;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index 3881f4d..e30f79e 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 76300b7..90404b7 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -17,11 +17,6 @@
*/
package org.apache.drill.hbase;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.Arrays;
-
-import org.junit.Ignore;
import org.junit.Test;
public class TestHBaseFilterPushDown extends BaseHBaseTest {
@@ -49,6 +44,17 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
}
@Test
+ public void testFilterPushDownRowKeyBetween() throws Exception {
+ runSQLVerifyCount("SELECT\n"
+ + " *\n"
+ + "FROM\n"
+ + " hbase.`[TABLE_NAME]` tableName\n"
+ + "WHERE\n"
+ + " row_key BETWEEN 'a2' AND 'b4'"
+ , 3);
+ }
+
+ @Test
public void testFilterPushDownMultiColumns() throws Exception {
runSQLVerifyCount("SELECT\n"
+ " *\n"
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
index 88194d5..b66d2ed 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
@@ -33,7 +33,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
@Test
public void testColumnWith1RowPushDown() throws Exception{
runSQLVerifyCount("SELECT\n"
- + "f2['c7']\n"
+ + "f2['c7'] as `f[c7]`\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName"
, 1);
@@ -43,7 +43,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
public void testRowKeyAndColumnPushDown() throws Exception{
setColumnWidth(9);
runSQLVerifyCount("SELECT\n"
- + "row_key, f['c1']*31 as `f['c1']*31`, f['c2'] as `f['c2']`, 5 as `5`, 'abc' as `'abc'`\n"
+ + "row_key, f['c1']*31 as `f[c1]*31`, f['c2'] as `f[c2]`, 5 as `5`, 'abc' as `'abc'`\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName"
, 6);
@@ -51,8 +51,9 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
@Test
public void testColumnFamilyPushDown() throws Exception{
+ setColumnWidth(74);
runSQLVerifyCount("SELECT\n"
- + "f\n"
+ + "f, f2\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName"
, 6);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index 439552f..3d749d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
@@ -202,7 +203,7 @@ public class MaterializedField{
@Override
public String toString() {
- return "MaterializedField [path=" + path + ", type=" + type + "]";
+ return "MaterializedField [path=" + path + ", type=" + Types.toString(type) + "]";
}
public String toExpr(){
[05/14] git commit: DRILL-770: Fix decimal math functions with
constants
Posted by ja...@apache.org.
DRILL-770: Fix decimal math functions with constants
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/62c0b1ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/62c0b1ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/62c0b1ba
Branch: refs/heads/master
Commit: 62c0b1ba384fd2b3c7a9af12ed75341a4294f331
Parents: 5a78ff8
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 18 18:31:57 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Mon May 19 10:43:26 2014 -0700
----------------------------------------------------------------------
.../codegen/templates/Decimal/DecimalFunctions.java | 2 ++
.../apache/drill/exec/resolver/TypeCastRules.java | 6 ------
.../apache/drill/jdbc/test/TestFunctionsQuery.java | 15 +++++++++++++++
3 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/62c0b1ba/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java b/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
index 8f14e83..cff122e 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/DecimalFunctions.java
@@ -237,8 +237,10 @@ import org.apache.drill.exec.expr.annotations.Workspace;
if (left.scale < right.scale) {
left.value = (${javaType}) (left.value * Math.pow(10, (right.scale - left.scale)));
+ left.scale = right.scale;
} else if (right.scale < left.scale) {
right.value = (${javaType}) (right.value * Math.pow(10, (left.scale - right.scale)));
+ right.scale = left.scale;
}
</#macro>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/62c0b1ba/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
index 515843d..2f6bf38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
@@ -420,12 +420,6 @@ public class TypeCastRules {
rule.add(MinorType.UINT2);
rule.add(MinorType.UINT4);
rule.add(MinorType.UINT8);
- rule.add(MinorType.DECIMAL9);
- rule.add(MinorType.DECIMAL18);
- rule.add(MinorType.DECIMAL28SPARSE);
- rule.add(MinorType.DECIMAL28DENSE);
- rule.add(MinorType.DECIMAL38SPARSE);
- rule.add(MinorType.DECIMAL38DENSE);
rule.add(MinorType.DATE);
rule.add(MinorType.TIME);
rule.add(MinorType.TIMESTAMPTZ);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/62c0b1ba/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
index 05884e5..66ae477 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
@@ -452,4 +452,19 @@ public class TestFunctionsQuery {
"CNT=1.000\n" +
"CNT=3.000\n");
}
+
+ @Test
+ public void testDecimalAddIntConstant() throws Exception {
+ String query = "select 1 + cast(employee_id as decimal(9, 3)) as DEC_9 , 1 + cast(employee_id as decimal(38, 5)) as DEC_38 " +
+ "from cp.`employee.json` where employee_id <= 2";
+
+ JdbcAssert.withNoDefaultSchema()
+ .sql(query)
+ .returns(
+ "DEC_9=2.000; " +
+ "DEC_38=2.00000\n" +
+ "DEC_9=3.000; " +
+ "DEC_38=3.00000\n");
+ }
+
}
[08/14] git commit: DRILL-776: Support SelectionVector SV2 and SV4 in
Partition Sender.
Posted by ja...@apache.org.
DRILL-776: Support SelectionVector SV2 and SV4 in Partition Sender.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e9ac37db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e9ac37db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e9ac37db
Branch: refs/heads/master
Commit: e9ac37dbff8f0b606673bb5827c0daac1d3275b0
Parents: c40735e
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Mon May 19 08:35:34 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 17:46:19 2014 -0700
----------------------------------------------------------------------
.../PartitionSenderRootExec.java | 69 +++++++++++++++-----
.../impl/partitionsender/Partitioner.java | 5 ++
.../partitionsender/PartitionerSV2Template.java | 60 +++++++++++++++++
.../partitionsender/PartitionerSV4Template.java | 61 +++++++++++++++++
.../physical/HashToRandomExchangePrel.java | 5 ++
.../org/apache/drill/TestExampleQueries.java | 10 +++
6 files changed, 193 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index bcd484c..f574351 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -182,8 +182,24 @@ public class PartitionSenderRootExec implements RootExec {
// set up partitioning function
final LogicalExpression expr = operator.getExpr();
final ErrorCollector collector = new ErrorCollectorImpl();
- final ClassGenerator<Partitioner> cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION,
- context.getFunctionRegistry());
+ final ClassGenerator<Partitioner> cg ;
+
+ boolean hyper = false;
+
+ switch(incoming.getSchema().getSelectionVectorMode()){
+ case NONE:
+ cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ break;
+ case TWO_BYTE:
+ cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION_SV2, context.getFunctionRegistry());
+ break;
+ case FOUR_BYTE:
+ cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION_SV4, context.getFunctionRegistry());
+ hyper = true;
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, incoming, collector, context.getFunctionRegistry());
if (collector.hasErrors()) {
@@ -255,22 +271,41 @@ public class PartitionSenderRootExec implements RootExec {
Class<?> vvType = TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(),
vvIn.getField().getType().getMode());
JClass vvClass = cg.getModel().ref(vvType);
- // the following block generates calls to copyFrom(); e.g.:
- // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
- // outgoingBatches[bucket].getRecordCount(),
- // vv1);
- cg.getEvalBlock()._if(
- ((JExpression) JExpr.cast(vvClass,
- ((JExpression)
- outgoingVectors
- .component(bucket))
- .component(JExpr.lit(fieldId))))
- .invoke("copyFromSafe")
- .arg(inIndex)
- .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
- .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush"))
- ._return();
+ if (!hyper) {
+ // the following block generates calls to copyFrom(); e.g.:
+ // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
+ // outgoingBatches[bucket].getRecordCount(),
+ // vv1);
+ cg.getEvalBlock()._if(
+ ((JExpression) JExpr.cast(vvClass,
+ ((JExpression)
+ outgoingVectors
+ .component(bucket))
+ .component(JExpr.lit(fieldId))))
+ .invoke("copyFromSafe")
+ .arg(inIndex)
+ .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
+ .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush"))
+ ._return();
+ } else {
+ // the following block generates calls to copyFrom(); e.g.:
+ // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
+ // outgoingBatches[bucket].getRecordCount(),
+ // vv1[((inIndex)>>> 16)]);
+ cg.getEvalBlock()._if(
+ ((JExpression) JExpr.cast(vvClass,
+ ((JExpression)
+ outgoingVectors
+ .component(bucket))
+ .component(JExpr.lit(fieldId))))
+ .invoke("copyFromSafe")
+ .arg(inIndex)
+ .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
+ .arg(incomingVV.component(inIndex.shrz(JExpr.lit(16)))).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush"))
+ ._return();
+
+ }
++fieldId;
}
// generate the OutgoingRecordBatch helper invocations
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 7d3998b..3ffead0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -31,4 +31,9 @@ public interface Partitioner {
public abstract void partitionBatch(RecordBatch incoming);
public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
+
+ public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION_SV2 = new TemplateClassDefinition<>(Partitioner.class, PartitionerSV2Template.class);
+
+ public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION_SV4 = new TemplateClassDefinition<>(Partitioner.class, PartitionerSV4Template.class);
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
new file mode 100644
index 0000000..981055a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+public abstract class PartitionerSV2Template implements Partitioner {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerSV2Template.class);
+
+ private SelectionVector2 sv2;
+
+ public PartitionerSV2Template() throws SchemaChangeException {
+ }
+
+ @Override
+ public final void setup(FragmentContext context,
+ RecordBatch incoming,
+ OutgoingRecordBatch[] outgoing) throws SchemaChangeException {
+
+ this.sv2 = incoming.getSelectionVector2();
+
+ doSetup(context, incoming, outgoing);
+
+ }
+
+ @Override
+ public void partitionBatch(RecordBatch incoming) {
+
+ for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
+ // for each record
+ doEval(sv2.getIndex(recordId), 0);
+ }
+
+ }
+
+ public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
+ public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
new file mode 100644
index 0000000..2e00f9b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public abstract class PartitionerSV4Template implements Partitioner {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerSV4Template.class);
+
+ private SelectionVector4 sv4;
+
+ public PartitionerSV4Template() throws SchemaChangeException {
+ }
+
+ @Override
+ public final void setup(FragmentContext context,
+ RecordBatch incoming,
+ OutgoingRecordBatch[] outgoing) throws SchemaChangeException {
+
+ this.sv4 = incoming.getSelectionVector4();
+
+ doSetup(context, incoming, outgoing);
+
+ }
+
+ @Override
+ public void partitionBatch(RecordBatch incoming) {
+
+ for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
+ // for each record
+ doEval(sv4.get(recordId), 0);
+ }
+
+ }
+
+ public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
+ public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
index d582684..9756a76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
@@ -115,5 +115,10 @@ public class HashToRandomExchangePrel extends SinglePrel {
return SelectionVectorMode.NONE;
}
+ @Override
+ public SelectionVectorMode[] getSupportedEncodings() {
+ return SelectionVectorMode.ALL;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 83b43fb..1757290 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -49,6 +49,16 @@ public class TestExampleQueries extends BaseTestQuery{
}
@Test
+ public void testHashPartitionSV2 () throws Exception{
+ test("select count(n_nationkey) from cp.`tpch/nation.parquet` where n_nationkey > 8 group by n_regionkey");
+ }
+
+ @Test
+ public void testHashPartitionSV4 () throws Exception{
+ test("select count(n_nationkey) as cnt from cp.`tpch/nation.parquet` group by n_regionkey order by cnt");
+ }
+
+ @Test
public void testSelectWithLimit() throws Exception{
test("select employee_id, first_name, last_name from cp.`employee.json` limit 5 ");
}
[09/14] git commit: DRILL-768: Support 2 phase COUNT() aggregates.
Posted by ja...@apache.org.
DRILL-768: Support 2 phase COUNT() aggregates.
Create a new SUM aggregate function whose return type is non-nullable to match the return type of COUNT.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/78ae2658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/78ae2658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/78ae2658
Branch: refs/heads/master
Commit: 78ae26589745b0ed538a15644f6fd6897cb9e5ad
Parents: e9ac37d
Author: Aman Sinha <as...@maprtech.com>
Authored: Fri May 16 10:33:10 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 17:46:39 2014 -0700
----------------------------------------------------------------------
.../exec/planner/physical/AggPrelBase.java | 180 +++++++++++++++++++
.../exec/planner/physical/AggPruleBase.java | 2 +-
.../exec/planner/physical/HashAggPrel.java | 51 +-----
.../exec/planner/physical/HashAggPrule.java | 10 +-
.../exec/planner/physical/StreamAggPrel.java | 72 ++------
.../exec/planner/physical/StreamAggPrule.java | 21 ++-
6 files changed, 221 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
new file mode 100644
index 0000000..c3b1188
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.util.BitSets;
+
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.AggregateRelBase;
+import org.eigenbase.rel.Aggregation;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.SqlAggFunction;
+import org.eigenbase.sql.SqlFunctionCategory;
+import org.eigenbase.sql.SqlKind;
+import org.eigenbase.sql.type.OperandTypes;
+import org.eigenbase.sql.type.ReturnTypes;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.ImmutableList;
+
+
+public abstract class AggPrelBase extends AggregateRelBase implements Prel{
+
+ protected static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
+
+ protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1 ; // default phase
+ protected List<NamedExpression> keys = Lists.newArrayList();
+ protected List<NamedExpression> aggExprs = Lists.newArrayList();
+ protected List<AggregateCall> phase2AggCallList = Lists.newArrayList();
+
+
+ /**
+ * Specialized aggregate function for SUMing the COUNTs. Since return type of
+ * COUNT is non-nullable and return type of SUM is nullable, this class enables
+ * creating a SUM whose return type is non-nullable.
+ *
+ */
+ public class SqlSumCountAggFunction extends SqlAggFunction {
+
+ private final RelDataType type;
+
+ public SqlSumCountAggFunction(RelDataType type) {
+ super("SUM",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.BIGINT, // use the inferred return type of SqlCountAggFunction
+ null,
+ OperandTypes.NUMERIC,
+ SqlFunctionCategory.NUMERIC);
+
+ this.type = type;
+ }
+
+ public List<RelDataType> getParameterTypes(RelDataTypeFactory typeFactory) {
+ return ImmutableList.of(type);
+ }
+
+ public RelDataType getType() {
+ return type;
+ }
+
+ public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
+ return type;
+ }
+
+ }
+
+ public AggPrelBase(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
+ List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException {
+ super(cluster, traits, child, groupSet, aggCalls);
+ this.operPhase = phase;
+ createKeysAndExprs();
+ }
+
+ public OperatorPhase getOperatorPhase() {
+ return operPhase;
+ }
+
+ public List<NamedExpression> getKeys() {
+ return keys;
+ }
+
+ public List<NamedExpression> getAggExprs() {
+ return aggExprs;
+ }
+
+ public List<AggregateCall> getPhase2AggCalls() {
+ return phase2AggCallList;
+ }
+
+ protected void createKeysAndExprs() {
+ final List<String> childFields = getChild().getRowType().getFieldNames();
+ final List<String> fields = getRowType().getFieldNames();
+
+ for (int group : BitSets.toIter(groupSet)) {
+ FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
+ keys.add(new NamedExpression(fr, fr));
+ }
+
+ for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
+ int aggExprOrdinal = groupSet.cardinality() + aggCall.i;
+ FieldReference ref = new FieldReference(fields.get(aggExprOrdinal));
+ LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
+ NamedExpression ne = new NamedExpression(expr, ref);
+ aggExprs.add(ne);
+
+ if (getOperatorPhase() == OperatorPhase.PHASE_1of2) {
+ if (aggCall.e.getAggregation().getName().equals("COUNT")) {
+ // If we are doing a COUNT aggregate in Phase1of2, then in Phase2of2 we should SUM the COUNTs,
+ Aggregation sumAggFun = new SqlSumCountAggFunction(aggCall.e.getType());
+ AggregateCall newAggCall =
+ new AggregateCall(
+ sumAggFun,
+ aggCall.e.isDistinct(),
+ Collections.singletonList(aggExprOrdinal),
+ aggCall.e.getType(),
+ aggCall.e.getName());
+
+ phase2AggCallList.add(newAggCall);
+ } else {
+ phase2AggCallList.add(aggCall.e);
+ }
+ }
+ }
+ }
+
+ protected LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
+ List<LogicalExpression> args = Lists.newArrayList();
+ for(Integer i : call.getArgList()){
+ args.add(new FieldReference(fn.get(i)));
+ }
+
+ // for count(1).
+ if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
+ LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
+ return expr;
+ }
+
+ @Override
+ public Iterator<Prel> iterator() {
+ return PrelUtil.iter(getChild());
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+ return logicalVisitor.visitPrel(this, value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index 563458e..4edeaf8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -67,7 +67,7 @@ public abstract class AggPruleBase extends Prule {
for (AggregateCall aggCall : aggregate.getAggCallList()) {
String name = aggCall.getAggregation().getName();
- if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX"))) {
+ if ( ! (name.equals("SUM") || name.equals("MIN") || name.equals("MAX") || name.equals("COUNT"))) {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index b2378be..31feb48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import net.hydromatic.linq4j.Ord;
import net.hydromatic.optiq.util.BitSets;
@@ -49,18 +50,19 @@ import org.eigenbase.relopt.RelTraitSet;
import com.beust.jcommander.internal.Lists;
-public class HashAggPrel extends AggregateRelBase implements Prel{
+public class HashAggPrel extends AggPrelBase implements Prel{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggPrel.class);
public HashAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
- List<AggregateCall> aggCalls) throws InvalidRelException {
- super(cluster, traits, child, groupSet, aggCalls);
+ List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException {
+ super(cluster, traits, child, groupSet, aggCalls, phase);
}
public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) {
try {
- return new HashAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls);
+ return new HashAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls,
+ this.getOperatorPhase());
} catch (InvalidRelException e) {
throw new AssertionError(e);
}
@@ -88,54 +90,16 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
- final List<String> childFields = getChild().getRowType().getFieldNames();
- final List<String> fields = getRowType().getFieldNames();
- List<NamedExpression> keys = Lists.newArrayList();
- List<NamedExpression> exprs = Lists.newArrayList();
-
- for (int group : BitSets.toIter(groupSet)) {
- FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
- keys.add(new NamedExpression(fr, fr));
- }
-
- for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
- FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
- LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
- exprs.add(new NamedExpression(expr, ref));
- }
-
Prel child = (Prel) this.getChild();
HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator),
keys.toArray(new NamedExpression[keys.size()]),
- exprs.toArray(new NamedExpression[exprs.size()]),
+ aggExprs.toArray(new NamedExpression[aggExprs.size()]),
1.0f);
return g;
}
- private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
- List<LogicalExpression> args = Lists.newArrayList();
- for(Integer i : call.getArgList()){
- args.add(new FieldReference(fn.get(i)));
- }
-
- // for count(1).
- if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
- LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
- return expr;
- }
-
- @Override
- public Iterator<Prel> iterator() {
- return PrelUtil.iter(getChild());
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
- return logicalVisitor.visitPrel(this, value);
- }
-
@Override
public SelectionVectorMode[] getSupportedEncodings() {
return SelectionVectorMode.DEFAULT;
@@ -146,5 +110,4 @@ public class HashAggPrel extends AggregateRelBase implements Prel{
return SelectionVectorMode.NONE;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 9395a1d..95c8362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -22,6 +22,7 @@ import java.util.logging.Logger;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
import org.eigenbase.rel.InvalidRelException;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRule;
@@ -95,7 +96,8 @@ public class HashAggPrule extends AggPruleBase {
HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ aggregate.getAggCallList(),
+ OperatorPhase.PHASE_1of2);
HashToRandomExchangePrel exch =
new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
@@ -103,7 +105,9 @@ public class HashAggPrule extends AggPruleBase {
HashAggPrel phase2Agg = new HashAggPrel(aggregate.getCluster(), traits, exch,
aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ phase1Agg.getPhase2AggCalls(),
+ OperatorPhase.PHASE_2of2);
+
call.transformTo(phase2Agg);
}
@@ -122,7 +126,7 @@ public class HashAggPrule extends AggPruleBase {
final RelNode convertedInput = convert(input, PrelUtil.fixTraits(call, traits));
HashAggPrel newAgg = new HashAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ aggregate.getAggCallList(), OperatorPhase.PHASE_1of1);
call.transformTo(newAgg);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
index 5fb758a..9706254 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
@@ -19,24 +19,13 @@ package org.apache.drill.exec.planner.physical;
import java.io.IOException;
import java.util.BitSet;
-import java.util.Iterator;
import java.util.List;
-import net.hydromatic.linq4j.Ord;
-import net.hydromatic.optiq.util.BitSets;
-
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.SingleMergeExchange;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.eigenbase.rel.AggregateCall;
import org.eigenbase.rel.AggregateRelBase;
@@ -48,30 +37,27 @@ import org.eigenbase.relopt.RelOptCost;
import org.eigenbase.relopt.RelOptPlanner;
import org.eigenbase.relopt.RelTraitSet;
-import com.beust.jcommander.internal.Lists;
-
-public class StreamAggPrel extends AggregateRelBase implements Prel{
+public class StreamAggPrel extends AggPrelBase implements Prel{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamAggPrel.class);
+
+
public StreamAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
- List<AggregateCall> aggCalls) throws InvalidRelException {
- super(cluster, traits, child, groupSet, aggCalls);
- for (AggregateCall aggCall : aggCalls) {
- if (aggCall.isDistinct()) {
- throw new InvalidRelException("DrillAggregateRel does not support DISTINCT aggregates");
- }
- }
+ List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException {
+ super(cluster, traits, child, groupSet, aggCalls, phase);
}
public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) {
try {
- return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls);
+ return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls,
+ this.getOperatorPhase());
} catch (InvalidRelException e) {
throw new AssertionError(e);
}
}
+
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
@@ -91,51 +77,15 @@ public class StreamAggPrel extends AggregateRelBase implements Prel{
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
- final List<String> childFields = getChild().getRowType().getFieldNames();
- final List<String> fields = getRowType().getFieldNames();
- List<NamedExpression> keys = Lists.newArrayList();
- List<NamedExpression> exprs = Lists.newArrayList();
-
- for (int group : BitSets.toIter(groupSet)) {
- FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
- keys.add(new NamedExpression(fr, fr));
- }
-
- for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
- FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
- LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
- exprs.add(new NamedExpression(expr, ref));
- }
Prel child = (Prel) this.getChild();
- StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), exprs.toArray(new NamedExpression[exprs.size()]), 1.0f);
+ StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]),
+ aggExprs.toArray(new NamedExpression[aggExprs.size()]), 1.0f);
return g;
}
-
- private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
- List<LogicalExpression> args = Lists.newArrayList();
- for(Integer i : call.getArgList()){
- args.add(new FieldReference(fn.get(i)));
- }
-
- // for count(1).
- if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
- LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN );
- return expr;
- }
-
- @Override
- public Iterator<Prel> iterator() {
- return PrelUtil.iter(getChild());
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
- return logicalVisitor.visitPrel(this, value);
- }
-
+
@Override
public SelectionVectorMode[] getSupportedEncodings() {
return SelectionVectorMode.ALL;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/78ae2658/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index ff648a4..9a60a14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -25,6 +25,7 @@ import net.hydromatic.optiq.util.BitSets;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.eigenbase.rel.AggregateCall;
import org.eigenbase.rel.InvalidRelException;
@@ -59,8 +60,12 @@ public class StreamAggPrule extends AggPruleBase {
final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
final RelNode input = aggregate.getChild();
RelCollation collation = getCollation(aggregate);
-
RelTraitSet traits = null;
+
+ if (aggregate.containsDistinctCall()) {
+ // currently, don't use StreamingAggregate if any of the logical aggrs contains DISTINCT
+ return;
+ }
try {
if (aggregate.getGroupSet().isEmpty()) {
@@ -82,14 +87,16 @@ public class StreamAggPrule extends AggPruleBase {
StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ aggregate.getAggCallList(),
+ OperatorPhase.PHASE_1of2);
UnionExchangePrel exch =
new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg);
StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch,
aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ phase1Agg.getPhase2AggCalls(),
+ OperatorPhase.PHASE_2of2);
call.transformTo(phase2Agg);
}
@@ -135,7 +142,8 @@ public class StreamAggPrule extends AggPruleBase {
StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ aggregate.getAggCallList(),
+ OperatorPhase.PHASE_1of2);
int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
@@ -147,7 +155,8 @@ public class StreamAggPrule extends AggPruleBase {
StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), traits, exch,
aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ phase1Agg.getPhase2AggCalls(),
+ OperatorPhase.PHASE_2of2);
call.transformTo(phase2Agg);
}
@@ -166,7 +175,7 @@ public class StreamAggPrule extends AggPruleBase {
final RelNode convertedInput = convert(input, traits);
StreamAggPrel newAgg = new StreamAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
- aggregate.getAggCallList());
+ aggregate.getAggCallList(), OperatorPhase.PHASE_1of1);
call.transformTo(newAgg);
}
[03/14] git commit: DRILL-757: Output mutator interface changes -
Output mutator manages schema changes instead of record readers - Removed
usages of deprecated interface
Posted by ja...@apache.org.
DRILL-757: Output mutator interface changes
- Output mutator manages schema changes instead of record readers
- Removed usages of deprecated interface
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/828a5c69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/828a5c69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/828a5c69
Branch: refs/heads/master
Commit: 828a5c69501fd94a64359c6e3abd474f138dc92f
Parents: cb0d46f
Author: Mehant Baid <me...@gmail.com>
Authored: Thu May 15 18:03:28 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Sun May 18 02:34:31 2014 -0700
----------------------------------------------------------------------
.../exec/store/hbase/HBaseRecordReader.java | 4 -
.../drill/exec/physical/impl/OutputMutator.java | 14 ++-
.../drill/exec/physical/impl/ScanBatch.java | 99 +++++++++++---------
.../exec/store/easy/json/JSONRecordReader2.java | 3 +-
.../drill/exec/store/hive/HiveRecordReader.java | 7 +-
.../exec/store/ischema/RowRecordReader.java | 17 +---
.../drill/exec/store/mock/MockRecordReader.java | 18 ++--
.../exec/store/parquet/ParquetRecordReader.java | 11 ---
.../drill/exec/store/pojo/PojoRecordReader.java | 2 -
.../exec/store/text/DrillTextRecordReader.java | 6 +-
.../drill/exec/store/TestOutputMutator.java | 28 +++---
.../exec/store/ischema/TestOrphanSchema.java | 19 +++-
.../exec/store/ischema/TestTableProvider.java | 22 +++--
.../store/parquet/ParquetRecordReaderTest.java | 29 +++---
14 files changed, 136 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index dbf8123..ae9f833 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -128,7 +128,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
@Override
public void setup(OutputMutator output) throws ExecutionSetupException {
this.outputMutator = output;
- output.removeAllFields();
vvMap = new HashMap<FamilyQualifierWrapper, NullableVarBinaryVector>();
try {
@@ -141,8 +140,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
getOrCreateColumnVector(new FamilyQualifierWrapper(column), false);
}
}
- output.setNewSchema();
-
logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.",
hbaseTable, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM),
hbaseConf.get(HBASE_ZOOKEEPER_PORT), hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
@@ -227,7 +224,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
v.allocateNew();
}
vvMap.put(column, v);
- outputMutator.setNewSchema();
}
return v;
} catch (SchemaChangeException e) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index 38d56ea..1aec625 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -22,12 +22,16 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.vector.ValueVector;
+import java.util.List;
+
public interface OutputMutator {
- public void removeField(MaterializedField field) throws SchemaChangeException;
public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException ;
+ public void allocate(int recordCount);
+ public boolean isNewSchema();
- @Deprecated
- public void addField(ValueVector vector) throws SchemaChangeException ;
- public void removeAllFields();
- public void setNewSchema() throws SchemaChangeException;
+ /* TODO: This interface is added to support information schema tables,
+ * FixedTables, the way they exist currently.
+ * One to many layers to rip out, address it as a separate JIRA.
+ */
+ public void addFields(List<ValueVector> vvList);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index a49d1a8..c0810c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -61,10 +61,11 @@ public class ScanBatch implements RecordBatch {
private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
final Map<MaterializedField, ValueVector> fieldVectorMap = Maps.newHashMap();
+ final Map<MaterializedField, Class<?>> fieldVectorClassMap = Maps.newHashMap();
+ private static final int MAX_RECORD_CNT = Character.MAX_VALUE;
private final VectorContainer container = new VectorContainer();
private int recordCount;
- private boolean schemaChanged = true;
private final FragmentContext context;
private final OperatorContext oContext;
private Iterator<RecordReader> readers;
@@ -123,6 +124,7 @@ public class ScanBatch implements RecordBatch {
@Override
public IterOutcome next() {
+ mutator.allocate(MAX_RECORD_CNT);
while ((recordCount = currentReader.next()) == 0) {
try {
if (!readers.hasNext()) {
@@ -133,8 +135,8 @@ public class ScanBatch implements RecordBatch {
currentReader.cleanup();
currentReader = readers.next();
partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
- mutator.removeAllFields();
currentReader.setup(mutator);
+ mutator.allocate(MAX_RECORD_CNT);
addPartitionVectors();
} catch (ExecutionSetupException e) {
this.context.fail(e);
@@ -144,28 +146,32 @@ public class ScanBatch implements RecordBatch {
}
populatePartitionVectors();
- if (schemaChanged) {
- schemaChanged = false;
+ if (mutator.isNewSchema()) {
+ container.buildSchema(SelectionVectorMode.NONE);
+ schema = container.getSchema();
return IterOutcome.OK_NEW_SCHEMA;
} else {
return IterOutcome.OK;
}
}
- private void addPartitionVectors() {
- partitionVectors = Lists.newArrayList();
- for (int i : selectedPartitionColumns) {
- MaterializedField field;
- ValueVector v;
- if (partitionValues.length > i) {
- field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.required(MinorType.VARCHAR));
- v = new VarCharVector(field, context.getAllocator());
- } else {
- field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR));
- v = new NullableVarCharVector(field, context.getAllocator());
+ private void addPartitionVectors() throws ExecutionSetupException{
+ try {
+ partitionVectors = Lists.newArrayList();
+ for (int i : selectedPartitionColumns) {
+ MaterializedField field;
+ ValueVector v;
+ if (partitionValues.length > i) {
+ field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.required(MinorType.VARCHAR));
+ v = mutator.addField(field, VarCharVector.class);
+ } else {
+ field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR));
+ v = mutator.addField(field, NullableVarCharVector.class);
+ }
+ partitionVectors.add(v);
}
- mutator.addField(v);
- partitionVectors.add(v);
+ } catch(SchemaChangeException e) {
+ throw new ExecutionSetupException(e);
}
}
@@ -212,43 +218,52 @@ public class ScanBatch implements RecordBatch {
private class Mutator implements OutputMutator {
- public void removeField(MaterializedField field) throws SchemaChangeException {
- ValueVector vector = fieldVectorMap.remove(field);
- if (vector == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
- container.remove(vector);
- vector.close();
- }
+ boolean schemaChange = true;
- public void addField(ValueVector vector) {
- container.add(vector);
- fieldVectorMap.put(vector.getField(), vector);
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
+ // Check if the field exists
+ ValueVector v = fieldVectorMap.get(field);
+
+ if (v == null || v.getClass() != clazz) {
+ // Field does not exist add it to the map and the output container
+ v = TypeHelper.getNewVector(field, oContext.getAllocator());
+ if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+ container.add(v);
+ fieldVectorMap.put(field, v);
+
+ // Adding new vectors to the container mark that the schema has changed
+ schemaChange = true;
+ }
+
+ return (T) v;
}
@Override
- public void removeAllFields() {
- for(VectorWrapper<?> vw : container){
- vw.clear();
+ public void addFields(List<ValueVector> vvList) {
+ for (ValueVector v : vvList) {
+ fieldVectorMap.put(v.getField(), v);
+ container.add(v);
}
- container.clear();
- fieldVectorMap.clear();
+ schemaChange = true;
}
@Override
- public void setNewSchema() throws SchemaChangeException {
- container.buildSchema(SelectionVectorMode.NONE);
- schema = container.getSchema();
- ScanBatch.this.schemaChanged = true;
+ public void allocate(int recordCount) {
+ for (ValueVector v : fieldVectorMap.values()) {
+ AllocationHelper.allocate(v, recordCount, 50, 10);
+ }
}
- @SuppressWarnings("unchecked")
@Override
- public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
- ValueVector v = TypeHelper.getNewVector(field, oContext.getAllocator());
- if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
- addField(v);
- return (T) v;
+ public boolean isNewSchema() {
+ if (schemaChange == true) {
+ schemaChange = false;
+ return true;
+ }
+ return false;
}
-
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
index bb52a20..37624d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -95,10 +95,9 @@ public class JSONRecordReader2 implements RecordReader{
writer.setValueCount(i);
- mutator.setNewSchema();
return i;
- }catch(IOException | SchemaChangeException e){
+ }catch(IOException e){
throw new DrillRuntimeException("Failure while reading JSON file.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 9e01268..4361262 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -218,7 +218,6 @@ public class HiveRecordReader implements RecordReader {
@Override
public void setup(OutputMutator output) throws ExecutionSetupException {
- output.removeAllFields();
try {
for (int i = 0; i < columnNames.size(); i++) {
PrimitiveCategory pCat = primitiveCategories.get(i);
@@ -230,11 +229,9 @@ public class HiveRecordReader implements RecordReader {
for (int i = 0; i < selectedPartitionNames.size(); i++) {
String type = selectedPartitionTypes.get(i);
MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(columnNames.get(i)), Types.getMajorTypeFromName(type));
- ValueVector vv = TypeHelper.getNewVector(field, context.getAllocator());
- pVectors.add(vv);
- output.addField(vv);
+ Class vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
+ pVectors.add(output.addField(field, vvClass));
}
- output.setNewSchema();
} catch(SchemaChangeException e) {
throw new ExecutionSetupException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
index ac601d4..c578b5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
@@ -29,8 +29,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.ValueVector;
-
-
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
/**
@@ -80,15 +79,8 @@ public class RowRecordReader implements RecordReader {
// Inform drill of the output columns. They were set up when the vector handler was created.
// Note we are currently working with fixed tables.
- try {
- for (ValueVector v: batch.getValueVectors()) {
- output.addField(v);;
- }
- output.setNewSchema();
- } catch (SchemaChangeException e) {
- throw new ExecutionSetupException("Failure while setting up fields", e);
- }
-
+ output.addFields(batch.getValueVectors());
+
// Estimate the number of records we can hold in a RecordBatch
maxRowCount = batch.getEstimatedRowCount(bufSize);
}
@@ -101,9 +93,6 @@ public class RowRecordReader implements RecordReader {
@Override
public int next() {
- // Make note are are starting a new batch of records
- batch.beginBatch(maxRowCount);
-
// Repeat until out of data or vectors are full
int actualCount;
for (actualCount = 0; actualCount < maxRowCount && provider.hasNext(); actualCount++) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 5c07dc5..c7fc939 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.mock;
+import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.SchemaPath;
@@ -34,6 +35,8 @@ import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
+import java.util.List;
+
public class MockRecordReader implements RecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
@@ -76,9 +79,10 @@ public class MockRecordReader implements RecordReader {
for (int i = 0; i < config.getTypes().length; i++) {
MajorType type = config.getTypes()[i].getMajorType();
- valueVectors[i] = output.addField(getVector(config.getTypes()[i].getName(), type, batchRecordCount), (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+ MaterializedField field = getVector(config.getTypes()[i].getName(), type, batchRecordCount);
+ Class vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
+ valueVectors[i] = output.addField(field, vvClass);
}
- output.setNewSchema();
} catch (SchemaChangeException e) {
throw new ExecutionSetupException("Failure while setting up fields", e);
}
@@ -93,7 +97,6 @@ public class MockRecordReader implements RecordReader {
recordsRead += recordSetSize;
for(ValueVector v : valueVectors){
- AllocationHelper.allocate(v, recordSetSize, 50, 10);
// logger.debug(String.format("MockRecordReader: Generating %d records of random data for VV of type %s.", recordSetSize, v.getClass().getName()));
ValueVector.Mutator m = v.getMutator();
@@ -105,14 +108,5 @@ public class MockRecordReader implements RecordReader {
@Override
public void cleanup() {
- for (int i = 0; i < valueVectors.length; i++) {
- try {
- output.removeField(valueVectors[i].getField());
- } catch (SchemaChangeException e) {
- logger.warn("Failure while trying to remove field.", e);
- }
- valueVectors[i].close();
- }
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 4ca13a5..9cdd205 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -274,14 +274,6 @@ public class ParquetRecordReader implements RecordReader {
} catch (SchemaChangeException e) {
throw new ExecutionSetupException(e);
}
-
-// output.removeAllFields();
- try {
- output.setNewSchema();
- }catch(SchemaChangeException e) {
- throw new ExecutionSetupException("Error setting up output mutator.", e);
- }
-
}
private SchemaPath toFieldName(String[] paths) {
@@ -298,15 +290,12 @@ public class ParquetRecordReader implements RecordReader {
private void resetBatch() {
for (ColumnReader column : columnStatuses) {
- AllocationHelper.allocate(column.valueVec, recordsPerBatch, 10, 5);
column.valuesReadInCurrentPass = 0;
}
for (VarLengthColumn r : varLengthReader.columns){
- AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5);
r.valuesReadInCurrentPass = 0;
}
for (NullableVarLengthColumn r : varLengthReader.nullableColumns){
- AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5);
r.valuesReadInCurrentPass = 0;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index fc5a9b4..1ebd1f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -89,8 +89,6 @@ public class PojoRecordReader<T> implements RecordReader{
}
writers[i].init(output);
}
-
- output.setNewSchema();
}catch(SchemaChangeException e){
throw new ExecutionSetupException("Failure while setting up schema for PojoRecordReader.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index ef05b4c..5c3d381 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
@@ -95,12 +96,9 @@ public class DrillTextRecordReader implements RecordReader {
@Override
public void setup(OutputMutator output) throws ExecutionSetupException {
- output.removeAllFields();
MaterializedField field = MaterializedField.create(ref, Types.repeated(TypeProtos.MinorType.VARCHAR));
- vector = new RepeatedVarCharVector(field, context.getAllocator());
try {
- output.addField(vector);
- output.setNewSchema();
+ vector = output.addField(field, RepeatedVarCharVector.class);
} catch (SchemaChangeException e) {
throw new ExecutionSetupException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
index 37369da..0161632 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -56,18 +57,8 @@ public class TestOutputMutator implements OutputMutator, Iterable<VectorWrapper<
fieldVectorMap.put(vector.getField(), vector);
}
- @Override
- public void removeAllFields() {
- for (VectorWrapper<?> vw : container) {
- vw.clear();
- }
- container.clear();
- fieldVectorMap.clear();
- }
-
- @Override
- public void setNewSchema() throws SchemaChangeException {
- container.buildSchema(SelectionVectorMode.NONE);
+ public void addFields(List<ValueVector> v) {
+ return;
}
public Iterator<VectorWrapper<?>> iterator() {
@@ -75,7 +66,17 @@ public class TestOutputMutator implements OutputMutator, Iterable<VectorWrapper<
}
public void clear(){
- removeAllFields();
+
+ }
+
+ @Override
+ public boolean isNewSchema() {
+ return false;
+ }
+
+ @Override
+ public void allocate(int recordCount) {
+ return;
}
@Override
@@ -85,5 +86,4 @@ public class TestOutputMutator implements OutputMutator, Iterable<VectorWrapper<
addField(v);
return (T) v;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
index 3b8b57b..8eadb56 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
@@ -27,6 +27,7 @@ import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.memory.TopLevelAllocator;
@@ -138,18 +139,28 @@ public class TestOrphanSchema extends ExecTest {
vectors.add(vector);
}
+ public void addFields(List<ValueVector> v) {
+ return;
+ }
+
public Object get(int column, int row) {
return vectors.get(column).getAccessor().getObject(row);
}
- public void removeField(MaterializedField field) {}
- public void removeAllFields() {}
- public void setNewSchema() {}
-
@Override
public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
return null;
}
+
+ @Override
+ public void allocate(int recordCount) {
+ return;
+ }
+
+ @Override
+ public boolean isNewSchema() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
index 8da1ea4..217b792 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
@@ -143,22 +143,28 @@ public class TestTableProvider extends ExecTest {
static class TestOutput implements OutputMutator {
List<ValueVector> vectors = new ArrayList<ValueVector>();
- public void addField(ValueVector vector) throws SchemaChangeException {
- vectors.add(vector);
- }
-
public Object get(int column, int row) {
return vectors.get(column).getAccessor().getObject(row);
}
- public void removeField(MaterializedField field) {}
- public void removeAllFields() {}
- public void setNewSchema() {}
-
@Override
public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
return null;
}
+
+ @Override
+ public void addFields(List<ValueVector> vv) {
+ return;
+ }
+
+ @Override
+ public void allocate(int recordCount) {
+
+ }
+ @Override
+ public boolean isNewSchema() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/828a5c69/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 5d2c859..e594441 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -196,36 +196,33 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
List<MaterializedField> removedFields = Lists.newArrayList();
List<ValueVector> addFields = Lists.newArrayList();
- @Override
- public void removeField(MaterializedField field) throws SchemaChangeException {
- removedFields.add(field);
+
+ List<MaterializedField> getRemovedFields() {
+ return removedFields;
}
- @Override
- public void addField(ValueVector vector) throws SchemaChangeException {
- addFields.add(vector);
+ List<ValueVector> getAddFields() {
+ return addFields;
}
@Override
- public void removeAllFields() {
- addFields.clear();
+ public void addFields(List<ValueVector> vv) {
+ return;
}
@Override
- public void setNewSchema() throws SchemaChangeException {
+ public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
+ return null;
}
- List<MaterializedField> getRemovedFields() {
- return removedFields;
- }
+ @Override
+ public void allocate(int recordCount) {
- List<ValueVector> getAddFields() {
- return addFields;
}
@Override
- public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
- return null;
+ public boolean isNewSchema() {
+ return false;
}
}
[07/14] git commit: DRILL-588: Ignore leading zeroes while
determining if digits will fit in a given precision
Posted by ja...@apache.org.
DRILL-588: Ignore leading zeroes while determining if digits will fit in a given precision
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c40735ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c40735ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c40735ed
Branch: refs/heads/master
Commit: c40735ed0e582aba66d27b00a929bae173e8a6a9
Parents: 492ec59
Author: Mehant Baid <me...@gmail.com>
Authored: Mon May 19 11:32:59 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Mon May 19 11:32:59 2014 -0700
----------------------------------------------------------------------
.../templates/Decimal/CastVarCharDecimal.java | 19 +++++++++++++++++--
1 file changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c40735ed/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java b/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
index e3eb973..8441298 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/CastVarCharDecimal.java
@@ -83,6 +83,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
*/
int integerStartIndex = readIndex;
int integerEndIndex = endIndex;
+ boolean leadingDigitFound = false;
int radix = 10;
@@ -108,6 +109,13 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
byte[] buf = new byte[in.end - in.start];
in.buffer.getBytes(in.start, buf, 0, in.end - in.start);
throw new org.apache.drill.common.exceptions.DrillRuntimeException(new String(buf, com.google.common.base.Charsets.UTF_8));
+ } else if (leadingDigitFound == false) {
+ if (next == 0) {
+ // Ignore the leading zeroes while validating if input digits will fit within the given precision
+ integerStartIndex++;
+ } else {
+ leadingDigitFound = true;
+ }
}
out.value *= radix;
out.value += next;
@@ -215,7 +223,8 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
startIndex = readIndex;
int radix = 10;
-
+ boolean leadingDigitFound = false;
+
/* This is the first pass, we get the number of integer digits and based on the provided scale
* we compute which index into the ByteBuf we start storing the integer part of the Decimal
*/
@@ -243,7 +252,13 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
throw new NumberFormatException(new String(buf, com.google.common.base.Charsets.UTF_8));
}
- integerDigits++;
+ if (leadingDigitFound == false && next != 0) {
+ leadingDigitFound = true;
+ }
+
+ if (leadingDigitFound == true) {
+ integerDigits++;
+ }
}
}
[02/14] git commit: Fix alignment in Hash Join code
Posted by ja...@apache.org.
Fix alignment in Hash Join code
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cb0d46f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cb0d46f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cb0d46f6
Branch: refs/heads/master
Commit: cb0d46f69df2d89601f56b257f3f6c1d97eedc6e
Parents: 1f4276e
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 18 01:52:03 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Sun May 18 01:52:03 2014 -0700
----------------------------------------------------------------------
.../exec/physical/impl/join/HashJoinBatch.java | 634 +++++++++----------
.../impl/join/HashJoinBatchCreator.java | 10 +-
.../exec/physical/impl/join/HashJoinHelper.java | 278 ++++----
.../exec/physical/impl/join/HashJoinProbe.java | 38 +-
.../impl/join/HashJoinProbeTemplate.java | 310 ++++-----
5 files changed, 635 insertions(+), 635 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 5eec3bb..9afc033 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -59,397 +59,397 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
- // Probe side record batch
- private final RecordBatch left;
+ // Probe side record batch
+ private final RecordBatch left;
- // Build side record batch
- private final RecordBatch right;
+ // Build side record batch
+ private final RecordBatch right;
- // Join type, INNER, LEFT, RIGHT or OUTER
- private final JoinRelType joinType;
+ // Join type, INNER, LEFT, RIGHT or OUTER
+ private final JoinRelType joinType;
- // Join conditions
- private final List<JoinCondition> conditions;
+ // Join conditions
+ private final List<JoinCondition> conditions;
- // Runtime generated class implementing HashJoinProbe interface
- private HashJoinProbe hashJoinProbe = null;
+ // Runtime generated class implementing HashJoinProbe interface
+ private HashJoinProbe hashJoinProbe = null;
- /* Helper class
- * Maintains linked list of build side records with the same key
- * Keeps information about which build records have a corresponding
- * matching key in the probe side (for outer, right joins)
- */
- private HashJoinHelper hjHelper = null;
+ /* Helper class
+ * Maintains linked list of build side records with the same key
+ * Keeps information about which build records have a corresponding
+ * matching key in the probe side (for outer, right joins)
+ */
+ private HashJoinHelper hjHelper = null;
- // Underlying hashtable used by the hash join
- private HashTable hashTable = null;
+ // Underlying hashtable used by the hash join
+ private HashTable hashTable = null;
- /* Hyper container to store all build side record batches.
- * Records are retrieved from this container when there is a matching record
- * on the probe side
- */
- private ExpandableHyperContainer hyperContainer;
+ /* Hyper container to store all build side record batches.
+ * Records are retrieved from this container when there is a matching record
+ * on the probe side
+ */
+ private ExpandableHyperContainer hyperContainer;
- // Number of records in the output container
- private int outputRecords;
+ // Number of records in the output container
+ private int outputRecords;
- // Current batch index on the build side
- private int buildBatchIndex = 0;
+ // Current batch index on the build side
+ private int buildBatchIndex = 0;
- // List of vector allocators
- private List<VectorAllocator> allocators = null;
+ // List of vector allocators
+ private List<VectorAllocator> allocators = null;
- // Schema of the build side
- private BatchSchema rightSchema = null;
+ // Schema of the build side
+ private BatchSchema rightSchema = null;
- // Generator mapping for the build side
- private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
- "projectBuildRecord" /* eval method */,
- null /* reset */, null /* cleanup */);
+ // Generator mapping for the build side
+ private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
+ "projectBuildRecord" /* eval method */,
+ null /* reset */, null /* cleanup */);
- // Generator mapping for the probe side
- private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
- "projectProbeRecord" /* eval method */,
- null /* reset */, null /* cleanup */);
+ // Generator mapping for the probe side
+ private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
+ "projectProbeRecord" /* eval method */,
+ null /* reset */, null /* cleanup */);
- // Mapping set for the build side
- private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
- "buildBatch" /* read container */,
- "outgoing" /* write container */,
- PROJECT_BUILD, PROJECT_BUILD);
+ // Mapping set for the build side
+ private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
+ "buildBatch" /* read container */,
+ "outgoing" /* write container */,
+ PROJECT_BUILD, PROJECT_BUILD);
- // Mapping set for the probe side
- private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
- "probeBatch" /* read container */,
- "outgoing" /* write container */,
- PROJECT_PROBE, PROJECT_PROBE);
+ // Mapping set for the probe side
+ private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
+ "probeBatch" /* read container */,
+ "outgoing" /* write container */,
+ PROJECT_PROBE, PROJECT_PROBE);
- // indicates if we have previously returned an output batch
- boolean firstOutputBatch = true;
+ // indicates if we have previously returned an output batch
+ boolean firstOutputBatch = true;
- IterOutcome leftUpstream = IterOutcome.NONE;
-
- @Override
- public int getRecordCount() {
- return outputRecords;
- }
+ IterOutcome leftUpstream = IterOutcome.NONE;
+ @Override
+ public int getRecordCount() {
+ return outputRecords;
+ }
- @Override
- public IterOutcome next() {
- try {
- /* If we are here for the first time, execute the build phase of the
- * hash join and setup the run time generated class for the probe side
- */
- if (hashJoinProbe == null) {
+ @Override
+ public IterOutcome next() {
- // Initialize the hash join helper context
- hjHelper = new HashJoinHelper(context, oContext.getAllocator());
+ try {
+ /* If we are here for the first time, execute the build phase of the
+ * hash join and setup the run time generated class for the probe side
+ */
+ if (hashJoinProbe == null) {
- /* Build phase requires setting up the hash table. Hash table will
- * materialize both the build and probe side expressions while
- * creating the hash table. So we need to invoke next() on our probe batch
- * as well, for the materialization to be successful. This batch will not be used
- * till we complete the build phase.
- */
- leftUpstream = left.next();
+ // Initialize the hash join helper context
+ hjHelper = new HashJoinHelper(context, oContext.getAllocator());
- // Build the hash table, using the build side record batches.
- executeBuildPhase();
+ /* Build phase requires setting up the hash table. Hash table will
+ * materialize both the build and probe side expressions while
+ * creating the hash table. So we need to invoke next() on our probe batch
+ * as well, for the materialization to be successful. This batch will not be used
+ * till we complete the build phase.
+ */
+ leftUpstream = left.next();
- // Create the run time generated code needed to probe and project
- hashJoinProbe = setupHashJoinProbe();
- }
+ // Build the hash table, using the build side record batches.
+ executeBuildPhase();
- // Store the number of records projected
- if (hashTable != null) {
-
- // Allocate the memory for the vectors in the output container
- allocateVectors();
-
- outputRecords = hashJoinProbe.probeAndProject();
-
- /* We are here because of one the following
- * 1. Completed processing of all the records and we are done
- * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
- * Either case build the output container's schema and return
- */
- if (outputRecords > 0) {
-
- // Build the container schema and set the counts
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- container.setRecordCount(outputRecords);
-
- for (VectorWrapper<?> v : container) {
- v.getValueVector().getMutator().setValueCount(outputRecords);
- }
-
- // First output batch, return OK_NEW_SCHEMA
- if (firstOutputBatch == true) {
- firstOutputBatch = false;
- return IterOutcome.OK_NEW_SCHEMA;
- }
-
- // Not the first output batch
- return IterOutcome.OK;
- }
- } else {
- // Our build side is empty, we won't have any matches, clear the probe side
- if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
- for (VectorWrapper<?> wrapper : left) {
- wrapper.getValueVector().clear();
- }
- leftUpstream = left.next();
- while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
- for (VectorWrapper<?> wrapper : left) {
- wrapper.getValueVector().clear();
- }
- leftUpstream = left.next();
- }
- }
- }
+ // Create the run time generated code needed to probe and project
+ hashJoinProbe = setupHashJoinProbe();
+ }
- // No more output records, clean up and return
- return IterOutcome.NONE;
+ // Store the number of records projected
+ if (hashTable != null) {
- } catch (ClassTransformationException | SchemaChangeException | IOException e) {
- context.fail(e);
- killIncoming();
- return IterOutcome.STOP;
- }
- }
+ // Allocate the memory for the vectors in the output container
+ allocateVectors();
- public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
+ outputRecords = hashJoinProbe.probeAndProject();
- // Setup the hash table configuration object
- int conditionsSize = conditions.size();
+ /* We are here because of one the following
+ * 1. Completed processing of all the records and we are done
+ * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
+ * Either case build the output container's schema and return
+ */
+ if (outputRecords > 0) {
- NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
- NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
+ // Build the container schema and set the counts
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ container.setRecordCount(outputRecords);
- // Create named expressions from the conditions
- for (int i = 0; i < conditionsSize; i++) {
- rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i ));
- leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i));
+ for (VectorWrapper<?> v : container) {
+ v.getValueVector().getMutator().setValueCount(outputRecords);
+ }
- // Hash join only supports equality currently.
- assert conditions.get(i).getRelationship().equals("==");
+ // First output batch, return OK_NEW_SCHEMA
+ if (firstOutputBatch == true) {
+ firstOutputBatch = false;
+ return IterOutcome.OK_NEW_SCHEMA;
}
- // Set the left named expression to be null if the probe batch is empty.
- if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
- leftExpr = null;
- } else {
- if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
- throw new SchemaChangeException("Hash join does not support probe batch with selection vectors");
+ // Not the first output batch
+ return IterOutcome.OK;
+ }
+ } else {
+ // Our build side is empty, we won't have any matches, clear the probe side
+ if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
+ for (VectorWrapper<?> wrapper : left) {
+ wrapper.getValueVector().clear();
+ }
+ leftUpstream = left.next();
+ while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
+ for (VectorWrapper<?> wrapper : left) {
+ wrapper.getValueVector().clear();
+ }
+ leftUpstream = left.next();
}
}
+ }
- HashTableConfig htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
+ // No more output records, clean up and return
+ return IterOutcome.NONE;
- // Create the chained hash table
- ChainedHashTable ht = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
- hashTable = ht.createAndSetupHashTable(null);
+ } catch (ClassTransformationException | SchemaChangeException | IOException e) {
+ context.fail(e);
+ killIncoming();
+ return IterOutcome.STOP;
}
+ }
- public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
-
- //Setup the underlying hash table
- IterOutcome rightUpstream = right.next();
-
- boolean moreData = true;
-
- while (moreData) {
-
- switch (rightUpstream) {
-
- case NONE:
- case NOT_YET:
- case STOP:
- moreData = false;
- continue;
-
- case OK_NEW_SCHEMA:
- if (rightSchema == null) {
- rightSchema = right.getSchema();
-
- if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
- throw new SchemaChangeException("Hash join does not support build batch with selection vectors");
- }
- setupHashTable();
- } else {
- throw new SchemaChangeException("Hash join does not support schema changes");
- }
- // Fall through
- case OK:
- int currentRecordCount = right.getRecordCount();
-
- /* For every new build batch, we store some state in the helper context
- * Add new state to the helper context
- */
- hjHelper.addNewBatch(currentRecordCount);
-
- // Holder contains the global index where the key is hashed into using the hash table
- IntHolder htIndex = new IntHolder();
-
- // For every record in the build batch , hash the key columns
- for (int i = 0; i < currentRecordCount; i++) {
-
- HashTable.PutStatus status = hashTable.put(i, htIndex);
-
- if (status != HashTable.PutStatus.PUT_FAILED) {
- /* Use the global index returned by the hash table, to store
- * the current record index and batch index. This will be used
- * later when we probe and find a match.
- */
- hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
- }
- }
-
- /* Completed hashing all records in this batch. Transfer the batch
- * to the hyper vector container. Will be used when we want to retrieve
- * records that have matching keys on the probe side.
- */
- RecordBatchData nextBatch = new RecordBatchData(right);
- if (hyperContainer == null) {
- hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
- } else {
- hyperContainer.addBatch(nextBatch.getContainer());
- }
-
- // completed processing a batch, increment batch index
- buildBatchIndex++;
- break;
- }
- // Get the next record batch
- rightUpstream = right.next();
- }
+ public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {
+
+ // Setup the hash table configuration object
+ int conditionsSize = conditions.size();
+
+ NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
+ NamedExpression leftExpr[] = new NamedExpression[conditionsSize];
+
+ // Create named expressions from the conditions
+ for (int i = 0; i < conditionsSize; i++) {
+ rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i ));
+ leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i));
+
+ // Hash join only supports equality currently.
+ assert conditions.get(i).getRelationship().equals("==");
}
- public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
+ // Set the left named expression to be null if the probe batch is empty.
+ if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
+ leftExpr = null;
+ } else {
+ if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+ throw new SchemaChangeException("Hash join does not support probe batch with selection vectors");
+ }
+ }
- allocators = new ArrayList<>();
+ HashTableConfig htConfig = new HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
- final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- ClassGenerator<HashJoinProbe> g = cg.getRoot();
+ // Create the chained hash table
+ ChainedHashTable ht = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
+ hashTable = ht.createAndSetupHashTable(null);
+ }
- // Generate the code to project build side records
- g.setMappingSet(projectBuildMapping);
+ public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
+ //Setup the underlying hash table
+ IterOutcome rightUpstream = right.next();
- int fieldId = 0;
- JExpression buildIndex = JExpr.direct("buildIndex");
- JExpression outIndex = JExpr.direct("outIndex");
- g.rotateBlock();
+ boolean moreData = true;
- if (hyperContainer != null) {
- for(VectorWrapper<?> vv : hyperContainer) {
+ while (moreData) {
- MajorType inputType = vv.getField().getType();
- MajorType outputType;
- if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
- outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
- } else {
- outputType = inputType;
- }
+ switch (rightUpstream) {
- // Add the vector to our output container
- ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator());
- container.add(v);
- allocators.add(RemovingRecordBatch.getAllocator4(v));
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ moreData = false;
+ continue;
- JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId));
- JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
- g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
- .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
- .arg(outIndex)
- .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
+ case OK_NEW_SCHEMA:
+ if (rightSchema == null) {
+ rightSchema = right.getSchema();
- fieldId++;
+ if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
+ throw new SchemaChangeException("Hash join does not support build batch with selection vectors");
}
- }
- g.rotateBlock();
- g.getEvalBlock()._return(JExpr.TRUE);
+ setupHashTable();
+ } else {
+ throw new SchemaChangeException("Hash join does not support schema changes");
+ }
+ // Fall through
+ case OK:
+ int currentRecordCount = right.getRecordCount();
- // Generate the code to project probe side records
- g.setMappingSet(projectProbeMapping);
+ /* For every new build batch, we store some state in the helper context
+ * Add new state to the helper context
+ */
+ hjHelper.addNewBatch(currentRecordCount);
- int outputFieldId = fieldId;
- fieldId = 0;
- JExpression probeIndex = JExpr.direct("probeIndex");
- int recordCount = 0;
+ // Holder contains the global index where the key is hashed into using the hash table
+ IntHolder htIndex = new IntHolder();
- if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
- for (VectorWrapper<?> vv : left) {
+ // For every record in the build batch , hash the key columns
+ for (int i = 0; i < currentRecordCount; i++) {
- MajorType inputType = vv.getField().getType();
- MajorType outputType;
- if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
- outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
- } else {
- outputType = inputType;
- }
+ HashTable.PutStatus status = hashTable.put(i, htIndex);
- ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), oContext.getAllocator());
- container.add(v);
- allocators.add(RemovingRecordBatch.getAllocator4(v));
+ if (status != HashTable.PutStatus.PUT_FAILED) {
+ /* Use the global index returned by the hash table, to store
+ * the current record index and batch index. This will be used
+ * later when we probe and find a match.
+ */
+ hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
+ }
+ }
- JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
- JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
+ /* Completed hashing all records in this batch. Transfer the batch
+ * to the hyper vector container. Will be used when we want to retrieve
+ * records that have matching keys on the probe side.
+ */
+ RecordBatchData nextBatch = new RecordBatchData(right);
+ if (hyperContainer == null) {
+ hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
+ } else {
+ hyperContainer.addBatch(nextBatch.getContainer());
+ }
- g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
+ // completed processing a batch, increment batch index
+ buildBatchIndex++;
+ break;
+ }
+ // Get the next record batch
+ rightUpstream = right.next();
+ }
+ }
- fieldId++;
- outputFieldId++;
- }
- g.rotateBlock();
- g.getEvalBlock()._return(JExpr.TRUE);
+ public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
+
+ allocators = new ArrayList<>();
+
+ final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ ClassGenerator<HashJoinProbe> g = cg.getRoot();
+
+ // Generate the code to project build side records
+ g.setMappingSet(projectBuildMapping);
+
+
+ int fieldId = 0;
+ JExpression buildIndex = JExpr.direct("buildIndex");
+ JExpression outIndex = JExpr.direct("outIndex");
+ g.rotateBlock();
- recordCount = left.getRecordCount();
+ if (hyperContainer != null) {
+ for(VectorWrapper<?> vv : hyperContainer) {
+
+ MajorType inputType = vv.getField().getType();
+ MajorType outputType;
+ if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
+ outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+ } else {
+ outputType = inputType;
}
+ // Add the vector to our output container
+ ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator());
+ container.add(v);
+ allocators.add(RemovingRecordBatch.getAllocator4(v));
- HashJoinProbe hj = context.getImplementationClass(cg);
+ JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
+ g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
+ .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+ .arg(outIndex)
+ .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
- hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType);
- return hj;
+ fieldId++;
+ }
}
+ g.rotateBlock();
+ g.getEvalBlock()._return(JExpr.TRUE);
+
+ // Generate the code to project probe side records
+ g.setMappingSet(projectProbeMapping);
- private void allocateVectors(){
- for(VectorAllocator a : allocators){
- a.alloc(RecordBatch.MAX_BATCH_SIZE);
+ int outputFieldId = fieldId;
+ fieldId = 0;
+ JExpression probeIndex = JExpr.direct("probeIndex");
+ int recordCount = 0;
+
+ if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
+ for (VectorWrapper<?> vv : left) {
+
+ MajorType inputType = vv.getField().getType();
+ MajorType outputType;
+ if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
+ outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+ } else {
+ outputType = inputType;
}
- }
- public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
- super(popConfig, context);
- this.left = left;
- this.right = right;
- this.joinType = popConfig.getJoinType();
- this.conditions = popConfig.getConditions();
- }
+ ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), oContext.getAllocator());
+ container.add(v);
+ allocators.add(RemovingRecordBatch.getAllocator4(v));
+
+ JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
- @Override
- public void killIncoming() {
- this.left.kill();
- this.right.kill();
+ g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
+
+ fieldId++;
+ outputFieldId++;
+ }
+ g.rotateBlock();
+ g.getEvalBlock()._return(JExpr.TRUE);
+
+ recordCount = left.getRecordCount();
}
- @Override
- public void cleanup() {
- hjHelper.clear();
- // If we didn't receive any data, hyperContainer may be null, check before clearing
- if (hyperContainer != null) {
- hyperContainer.clear();
- }
+ HashJoinProbe hj = context.getImplementationClass(cg);
- if (hashTable != null) {
- hashTable.clear();
- }
- super.cleanup();
- left.cleanup();
- right.cleanup();
+ hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType);
+ return hj;
+ }
+
+ private void allocateVectors(){
+ for(VectorAllocator a : allocators){
+ a.alloc(RecordBatch.MAX_BATCH_SIZE);
+ }
+ }
+
+ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
+ super(popConfig, context);
+ this.left = left;
+ this.right = right;
+ this.joinType = popConfig.getJoinType();
+ this.conditions = popConfig.getConditions();
+ }
+
+ @Override
+ public void killIncoming() {
+ this.left.kill();
+ this.right.kill();
+ }
+
+ @Override
+ public void cleanup() {
+ hjHelper.clear();
+
+ // If we didn't receive any data, hyperContainer may be null, check before clearing
+ if (hyperContainer != null) {
+ hyperContainer.clear();
+ }
+
+ if (hashTable != null) {
+ hashTable.clear();
}
+ super.cleanup();
+ left.cleanup();
+ right.cleanup();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
index 19a4a29..d925958 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
@@ -29,9 +29,9 @@ import java.util.List;
public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> {
- @Override
- public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
- Preconditions.checkArgument(children.size() == 2);
- return new HashJoinBatch(config, context, children.get(0), children.get(1));
- }
+ @Override
+ public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.size() == 2);
+ return new HashJoinBatch(config, context, children.get(0), children.get(1));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
index b1ed07e..a634827 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -51,183 +51,183 @@ import org.apache.drill.exec.physical.impl.common.HashTable;
*/
public class HashJoinHelper {
- /* List of start indexes. Stores the record and batch index of the first record
- * with a give key.
- */
- List<SelectionVector4> startIndices = new ArrayList<>();
-
- // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
- List<BuildInfo> buildInfoList = new ArrayList<>();
+ /* List of start indexes. Stores the record and batch index of the first record
+ * with a give key.
+ */
+ List<SelectionVector4> startIndices = new ArrayList<>();
- // Fragment context
- FragmentContext context;
- BufferAllocator allocator;
+ // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
+ List<BuildInfo> buildInfoList = new ArrayList<>();
- // Constant to indicate index is empty.
- static final int INDEX_EMPTY = -1;
+ // Fragment context
+ FragmentContext context;
+ BufferAllocator allocator;
- // bits to shift while obtaining batch index from SV4
- static final int SHIFT_SIZE = 16;
+ // Constant to indicate index is empty.
+ static final int INDEX_EMPTY = -1;
- public HashJoinHelper(FragmentContext context, BufferAllocator allocator) {
- this.context = context;
- this.allocator = allocator;
- }
+ // bits to shift while obtaining batch index from SV4
+ static final int SHIFT_SIZE = 16;
- public void addStartIndexBatch() throws SchemaChangeException {
- startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
- }
+ public HashJoinHelper(FragmentContext context, BufferAllocator allocator) {
+ this.context = context;
+ this.allocator = allocator;
+}
- public class BuildInfo {
- // List of links. Logically it helps maintain a linked list of records with the same key value
- private SelectionVector4 links;
+ public void addStartIndexBatch() throws SchemaChangeException {
+ startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
+ }
- // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
- private BitSet keyMatchBitVector;
+ public class BuildInfo {
+ // List of links. Logically it helps maintain a linked list of records with the same key value
+ private SelectionVector4 links;
- // number of records in this batch
- private int recordCount;
+ // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
+ private BitSet keyMatchBitVector;
- public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) {
- this.links = links;
- this.keyMatchBitVector = keyMatchBitVector;
- this.recordCount = recordCount;
- }
+ // number of records in this batch
+ private int recordCount;
- public SelectionVector4 getLinks() {
- return links;
- }
+ public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) {
+ this.links = links;
+ this.keyMatchBitVector = keyMatchBitVector;
+ this.recordCount = recordCount;
+ }
- public BitSet getKeyMatchBitVector() {
- return keyMatchBitVector;
- }
+ public SelectionVector4 getLinks() {
+ return links;
}
- public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
+ public BitSet getKeyMatchBitVector() {
+ return keyMatchBitVector;
+ }
+ }
- ByteBuf vector = allocator.buffer((recordCount * 4));
+ public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
- SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
+ ByteBuf vector = allocator.buffer((recordCount * 4));
- // Initialize the vector
- for (int i = 0; i < recordCount; i++) {
- sv4.set(i, INDEX_EMPTY);
- }
+ SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
- return sv4;
+ // Initialize the vector
+ for (int i = 0; i < recordCount; i++) {
+ sv4.set(i, INDEX_EMPTY);
}
- public void addNewBatch(int recordCount) throws SchemaChangeException {
- // Add a node to the list of BuildInfo's
- BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount);
- buildInfoList.add(info);
- }
+ return sv4;
+ }
- public int getStartIndex(int keyIndex) {
- int batchIdx = keyIndex / HashTable.BATCH_SIZE;
- int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
+ public void addNewBatch(int recordCount) throws SchemaChangeException {
+ // Add a node to the list of BuildInfo's
+ BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount);
+ buildInfoList.add(info);
+ }
- assert batchIdx < startIndices.size();
+ public int getStartIndex(int keyIndex) {
+ int batchIdx = keyIndex / HashTable.BATCH_SIZE;
+ int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
- SelectionVector4 sv4 = startIndices.get(batchIdx);
+ assert batchIdx < startIndices.size();
- return sv4.get(offsetIdx);
- }
+ SelectionVector4 sv4 = startIndices.get(batchIdx);
- public int getNextIndex(int currentIdx) {
- // Get to the links field of the current index to get the next index
- int batchIdx = currentIdx >>> SHIFT_SIZE;
- int recordIdx = currentIdx & HashTable.BATCH_MASK;
+ return sv4.get(offsetIdx);
+ }
- assert batchIdx < buildInfoList.size();
+ public int getNextIndex(int currentIdx) {
+ // Get to the links field of the current index to get the next index
+ int batchIdx = currentIdx >>> SHIFT_SIZE;
+ int recordIdx = currentIdx & HashTable.BATCH_MASK;
- // Get the corresponding BuildInfo node
- BuildInfo info = buildInfoList.get(batchIdx);
- return info.getLinks().get(recordIdx);
- }
+ assert batchIdx < buildInfoList.size();
- public List<Integer> getNextUnmatchedIndex() {
- List<Integer> compositeIndexes = new ArrayList<>();
+ // Get the corresponding BuildInfo node
+ BuildInfo info = buildInfoList.get(batchIdx);
+ return info.getLinks().get(recordIdx);
+ }
- for (int i = 0; i < buildInfoList.size(); i++) {
- BuildInfo info = buildInfoList.get(i);
- int fromIndex = 0;
+ public List<Integer> getNextUnmatchedIndex() {
+ List<Integer> compositeIndexes = new ArrayList<>();
- while (((fromIndex = info.getKeyMatchBitVector().nextClearBit(fromIndex)) != -1) && (fromIndex < info.recordCount)) {
- compositeIndexes.add((i << SHIFT_SIZE) | (fromIndex & HashTable.BATCH_MASK));
- fromIndex++;
- }
- }
- return compositeIndexes;
+ for (int i = 0; i < buildInfoList.size(); i++) {
+ BuildInfo info = buildInfoList.get(i);
+ int fromIndex = 0;
+
+ while (((fromIndex = info.getKeyMatchBitVector().nextClearBit(fromIndex)) != -1) && (fromIndex < info.recordCount)) {
+ compositeIndexes.add((i << SHIFT_SIZE) | (fromIndex & HashTable.BATCH_MASK));
+ fromIndex++;
+ }
}
+ return compositeIndexes;
+ }
- public void setRecordMatched(int index) {
- int batchIdx = index >>> SHIFT_SIZE;
- int recordIdx = index & HashTable.BATCH_MASK;
+ public void setRecordMatched(int index) {
+ int batchIdx = index >>> SHIFT_SIZE;
+ int recordIdx = index & HashTable.BATCH_MASK;
- // Get the BitVector for the appropriate batch and set the bit to indicate the record matched
- BuildInfo info = buildInfoList.get(batchIdx);
- BitSet bitVector = info.getKeyMatchBitVector();
+ // Get the BitVector for the appropriate batch and set the bit to indicate the record matched
+ BuildInfo info = buildInfoList.get(batchIdx);
+ BitSet bitVector = info.getKeyMatchBitVector();
- bitVector.set(recordIdx);
- }
+ bitVector.set(recordIdx);
+ }
- public void setCurrentIndex(int keyIndex, int batchIndex, int recordIndex) throws SchemaChangeException {
+ public void setCurrentIndex(int keyIndex, int batchIndex, int recordIndex) throws SchemaChangeException {
- /* set the current record batch index and the index
- * within the batch at the specified keyIndex. The keyIndex
- * denotes the global index where the key for this record is
- * stored in the hash table
+ /* set the current record batch index and the index
+ * within the batch at the specified keyIndex. The keyIndex
+ * denotes the global index where the key for this record is
+ * stored in the hash table
+ */
+ int batchIdx = keyIndex / HashTable.BATCH_SIZE;
+ int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
+
+ if (keyIndex >= (HashTable.BATCH_SIZE * startIndices.size())) {
+ // allocate a new batch
+ addStartIndexBatch();
+ }
+
+ SelectionVector4 startIndex = startIndices.get(batchIdx);
+ int linkIndex;
+
+ // If head of the list is empty, insert current index at this position
+ if ((linkIndex = (startIndex.get(offsetIdx))) == INDEX_EMPTY) {
+ startIndex.set(offsetIdx, batchIndex, recordIndex);
+ } else {
+ /* Head of this list is not empty, if the first link
+ * is empty insert there
+ */
+ batchIdx = linkIndex >>> SHIFT_SIZE;
+ offsetIdx = linkIndex & Character.MAX_VALUE;
+
+ SelectionVector4 link = buildInfoList.get(batchIdx).getLinks();
+ int firstLink = link.get(offsetIdx);
+
+ if (firstLink == INDEX_EMPTY) {
+ link.set(offsetIdx, batchIndex, recordIndex);
+ } else {
+ /* Insert the current value as the first link and
+ * make the current first link as its next
*/
- int batchIdx = keyIndex / HashTable.BATCH_SIZE;
- int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
-
- if (keyIndex >= (HashTable.BATCH_SIZE * startIndices.size())) {
- // allocate a new batch
- addStartIndexBatch();
- }
-
- SelectionVector4 startIndex = startIndices.get(batchIdx);
- int linkIndex;
-
- // If head of the list is empty, insert current index at this position
- if ((linkIndex = (startIndex.get(offsetIdx))) == INDEX_EMPTY) {
- startIndex.set(offsetIdx, batchIndex, recordIndex);
- } else {
- /* Head of this list is not empty, if the first link
- * is empty insert there
- */
- batchIdx = linkIndex >>> SHIFT_SIZE;
- offsetIdx = linkIndex & Character.MAX_VALUE;
-
- SelectionVector4 link = buildInfoList.get(batchIdx).getLinks();
- int firstLink = link.get(offsetIdx);
-
- if (firstLink == INDEX_EMPTY) {
- link.set(offsetIdx, batchIndex, recordIndex);
- } else {
- /* Insert the current value as the first link and
- * make the current first link as its next
- */
- int firstLinkBatchIdx = firstLink >>> SHIFT_SIZE;
- int firstLinkOffsetIDx = firstLink & Character.MAX_VALUE;
-
- SelectionVector4 nextLink = buildInfoList.get(batchIndex).getLinks();
- nextLink.set(recordIndex, firstLinkBatchIdx, firstLinkOffsetIDx);
-
- link.set(offsetIdx, batchIndex, recordIndex);
- }
- }
+ int firstLinkBatchIdx = firstLink >>> SHIFT_SIZE;
+ int firstLinkOffsetIDx = firstLink & Character.MAX_VALUE;
+
+ SelectionVector4 nextLink = buildInfoList.get(batchIndex).getLinks();
+ nextLink.set(recordIndex, firstLinkBatchIdx, firstLinkOffsetIDx);
+
+ link.set(offsetIdx, batchIndex, recordIndex);
+ }
}
+ }
- public void clear() {
- // Clear the SV4 used for start indices
- for (SelectionVector4 sv4: startIndices) {
- sv4.clear();
- }
+ public void clear() {
+ // Clear the SV4 used for start indices
+ for (SelectionVector4 sv4: startIndices) {
+ sv4.clear();
+ }
- for (BuildInfo info : buildInfoList) {
- info.getLinks().clear();
- }
+ for (BuildInfo info : buildInfoList) {
+ info.getLinks().clear();
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index 160d352..6d20f60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -33,25 +33,25 @@ import org.eigenbase.rel.JoinRelType;
import java.io.IOException;
public interface HashJoinProbe {
- public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
+ public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
- /* The probe side of the hash join can be in the following two states
- * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
- * key match and if we do we project the record
- * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records
- * from the build side that did not match any records on the probe side. For Left outer
- * case we handle it internally by projecting the record if there isn't a match on the build side
- * 3. DONE: Once we have projected all possible records we are done
- */
- public static enum ProbeState {
- PROBE_PROJECT, PROJECT_RIGHT, DONE
- }
+ /* The probe side of the hash join can be in the following two states
+ * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
+ * key match and if we do we project the record
+ * 2. PROJECT_RIGHT: Right Outer or Full Outer joins where we are projecting the records
+ * from the build side that did not match any records on the probe side. For Left outer
+ * case we handle it internally by projecting the record if there isn't a match on the build side
+ * 3. DONE: Once we have projected all possible records we are done
+ */
+ public static enum ProbeState {
+ PROBE_PROJECT, PROJECT_RIGHT, DONE
+ }
- public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
- int probeRecordCount, RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
- JoinRelType joinRelType);
- public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
- public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
- public abstract boolean projectBuildRecord(int buildIndex, int outIndex);
- public abstract boolean projectProbeRecord(int probeIndex, int outIndex);
+ public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+ int probeRecordCount, RecordBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
+ JoinRelType joinRelType);
+ public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
+ public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
+ public abstract boolean projectBuildRecord(int buildIndex, int outIndex);
+ public abstract boolean projectProbeRecord(int probeIndex, int outIndex);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cb0d46f6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index a3e3b74..2a8be54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -35,196 +35,196 @@ import java.util.List;
public abstract class HashJoinProbeTemplate implements HashJoinProbe {
- // Probe side record batch
- private RecordBatch probeBatch;
+ // Probe side record batch
+ private RecordBatch probeBatch;
- // Join type, INNER, LEFT, RIGHT or OUTER
- private JoinRelType joinType;
+ // Join type, INNER, LEFT, RIGHT or OUTER
+ private JoinRelType joinType;
- /* Helper class
- * Maintains linked list of build side records with the same key
- * Keeps information about which build records have a corresponding
- * matching key in the probe side (for outer, right joins)
- */
- private HashJoinHelper hjHelper = null;
+ /* Helper class
+ * Maintains linked list of build side records with the same key
+ * Keeps information about which build records have a corresponding
+ * matching key in the probe side (for outer, right joins)
+ */
+ private HashJoinHelper hjHelper = null;
- // Underlying hashtable used by the hash join
- private HashTable hashTable = null;
+ // Underlying hashtable used by the hash join
+ private HashTable hashTable = null;
- // Number of records to process on the probe side
- private int recordsToProcess = 0;
+ // Number of records to process on the probe side
+ private int recordsToProcess = 0;
- // Number of records processed on the probe side
- private int recordsProcessed = 0;
+ // Number of records processed on the probe side
+ private int recordsProcessed = 0;
- // Number of records in the output container
- private int outputRecords;
+ // Number of records in the output container
+ private int outputRecords;
- // Indicate if we should drain the next record from the probe side
- private boolean getNextRecord = true;
+ // Indicate if we should drain the next record from the probe side
+ private boolean getNextRecord = true;
- // Contains both batch idx and record idx of the matching record in the build side
- private int currentCompositeIdx = -1;
+ // Contains both batch idx and record idx of the matching record in the build side
+ private int currentCompositeIdx = -1;
- // Current state the hash join algorithm is in
- private ProbeState probeState = ProbeState.PROBE_PROJECT;
+ // Current state the hash join algorithm is in
+ private ProbeState probeState = ProbeState.PROBE_PROJECT;
- // For outer or right joins, this is a list of unmatched records that needs to be projected
- private List<Integer> unmatchedBuildIndexes = null;
+ // For outer or right joins, this is a list of unmatched records that needs to be projected
+ private List<Integer> unmatchedBuildIndexes = null;
- @Override
- public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
- int probeRecordCount, RecordBatch outgoing, HashTable hashTable,
- HashJoinHelper hjHelper, JoinRelType joinRelType) {
+ @Override
+ public void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+ int probeRecordCount, RecordBatch outgoing, HashTable hashTable,
+ HashJoinHelper hjHelper, JoinRelType joinRelType) {
- this.probeBatch = probeBatch;
- this.joinType = joinRelType;
- this.recordsToProcess = probeRecordCount;
- this.hashTable = hashTable;
- this.hjHelper = hjHelper;
+ this.probeBatch = probeBatch;
+ this.joinType = joinRelType;
+ this.recordsToProcess = probeRecordCount;
+ this.hashTable = hashTable;
+ this.hjHelper = hjHelper;
- doSetup(context, buildBatch, probeBatch, outgoing);
+ doSetup(context, buildBatch, probeBatch, outgoing);
+ }
+
+ public void executeProjectRightPhase() {
+ while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) {
+ boolean success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++);
+ assert success;
}
+ }
- public void executeProjectRightPhase() {
- while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) {
- boolean success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++);
- assert success;
+ public void executeProbePhase() throws SchemaChangeException {
+ while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) {
+
+ // Check if we have processed all records in this batch we need to invoke next
+ if (recordsProcessed == recordsToProcess) {
+
+ // Done processing all records in the previous batch, clean up!
+ for (VectorWrapper<?> wrapper : probeBatch) {
+ wrapper.getValueVector().clear();
}
- }
- public void executeProbePhase() throws SchemaChangeException {
- while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) {
+ IterOutcome leftUpstream = probeBatch.next();
- // Check if we have processed all records in this batch we need to invoke next
- if (recordsProcessed == recordsToProcess) {
+ switch (leftUpstream) {
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ recordsProcessed = 0;
+ recordsToProcess = 0;
+ probeState = ProbeState.DONE;
- // Done processing all records in the previous batch, clean up!
- for (VectorWrapper<?> wrapper : probeBatch) {
- wrapper.getValueVector().clear();
- }
+ // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side
+ if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) {
+ probeState = ProbeState.PROJECT_RIGHT;
+ }
- IterOutcome leftUpstream = probeBatch.next();
+ continue;
- switch (leftUpstream) {
- case NONE:
- case NOT_YET:
- case STOP:
- recordsProcessed = 0;
- recordsToProcess = 0;
- probeState = ProbeState.DONE;
+ case OK_NEW_SCHEMA:
+ throw new SchemaChangeException("Hash join does not support schema changes");
+ case OK:
+ recordsToProcess = probeBatch.getRecordCount();
+ recordsProcessed = 0;
+ }
+ }
+ int probeIndex;
- // We are done with the probe phase. If its a RIGHT or a FULL join get the unmatched indexes from the build side
- if (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) {
- probeState = ProbeState.PROJECT_RIGHT;
- }
+ // Check if we need to drain the next row in the probe side
+ if (getNextRecord) {
+ probeIndex = hashTable.containsKey(recordsProcessed, true);
- continue;
+ if (probeIndex != -1) {
- case OK_NEW_SCHEMA:
- throw new SchemaChangeException("Hash join does not support schema changes");
- case OK:
- recordsToProcess = probeBatch.getRecordCount();
- recordsProcessed = 0;
- }
- }
- int probeIndex;
-
- // Check if we need to drain the next row in the probe side
- if (getNextRecord) {
- probeIndex = hashTable.containsKey(recordsProcessed, true);
-
- if (probeIndex != -1) {
-
- /* The current probe record has a key that matches. Get the index
- * of the first row in the build side that matches the current key
- */
- currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
-
- /* Record in the build side at currentCompositeIdx has a matching record in the probe
- * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
- * join we keep track of which records we need to project at the end
- */
- hjHelper.setRecordMatched(currentCompositeIdx);
-
- boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
- assert success;
- success = projectProbeRecord(recordsProcessed, outputRecords);
- assert success;
- outputRecords++;
-
- /* Projected single row from the build side with matching key but there
- * may be more rows with the same key. Check if that's the case
- */
- currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
- if (currentCompositeIdx == -1) {
- /* We only had one row in the build side that matched the current key
- * from the probe side. Drain the next row in the probe side.
- */
- recordsProcessed++;
- }
- else {
- /* There is more than one row with the same key on the build side
- * don't drain more records from the probe side till we have projected
- * all the rows with this key
- */
- getNextRecord = false;
- }
- }
- else { // No matching key
-
- // If we have a left outer join, project the keys
- if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
- projectProbeRecord(recordsProcessed, outputRecords++);
- }
- recordsProcessed++;
- }
+ /* The current probe record has a key that matches. Get the index
+ * of the first row in the build side that matches the current key
+ */
+ currentCompositeIdx = hjHelper.getStartIndex(probeIndex);
+
+ /* Record in the build side at currentCompositeIdx has a matching record in the probe
+ * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
+ * join we keep track of which records we need to project at the end
+ */
+ hjHelper.setRecordMatched(currentCompositeIdx);
+
+ boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
+ assert success;
+ success = projectProbeRecord(recordsProcessed, outputRecords);
+ assert success;
+ outputRecords++;
+
+ /* Projected single row from the build side with matching key but there
+ * may be more rows with the same key. Check if that's the case
+ */
+ currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+ if (currentCompositeIdx == -1) {
+ /* We only had one row in the build side that matched the current key
+ * from the probe side. Drain the next row in the probe side.
+ */
+ recordsProcessed++;
}
else {
- hjHelper.setRecordMatched(currentCompositeIdx);
- boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
- assert success;
- projectProbeRecord(recordsProcessed, outputRecords);
- outputRecords++;
-
- currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-
- if (currentCompositeIdx == -1) {
- // We don't have any more rows matching the current key on the build side, move on to the next probe row
- getNextRecord = true;
- recordsProcessed++;
- }
+ /* There is more than one row with the same key on the build side
+ * don't drain more records from the probe side till we have projected
+ * all the rows with this key
+ */
+ getNextRecord = false;
}
}
- }
+ else { // No matching key
- public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException {
+ // If we have a left outer join, project the keys
+ if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
+ projectProbeRecord(recordsProcessed, outputRecords++);
+ }
+ recordsProcessed++;
+ }
+ }
+ else {
+ hjHelper.setRecordMatched(currentCompositeIdx);
+ boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
+ assert success;
+ projectProbeRecord(recordsProcessed, outputRecords);
+ outputRecords++;
+
+ currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+
+ if (currentCompositeIdx == -1) {
+ // We don't have any more rows matching the current key on the build side, move on to the next probe row
+ getNextRecord = true;
+ recordsProcessed++;
+ }
+ }
+ }
+ }
- outputRecords = 0;
+ public int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException {
- if (probeState == ProbeState.PROBE_PROJECT) {
- executeProbePhase();
- }
+ outputRecords = 0;
- if (probeState == ProbeState.PROJECT_RIGHT) {
+ if (probeState == ProbeState.PROBE_PROJECT) {
+ executeProbePhase();
+ }
- // We are here because we have a RIGHT OUTER or a FULL join
- if (unmatchedBuildIndexes == null) {
- // Initialize list of build indexes that didn't match a record on the probe side
- unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
- recordsToProcess = unmatchedBuildIndexes.size();
- recordsProcessed = 0;
- }
+ if (probeState == ProbeState.PROJECT_RIGHT) {
- // Project the list of unmatched records on the build side
- executeProjectRightPhase();
- }
+ // We are here because we have a RIGHT OUTER or a FULL join
+ if (unmatchedBuildIndexes == null) {
+ // Initialize list of build indexes that didn't match a record on the probe side
+ unmatchedBuildIndexes = hjHelper.getNextUnmatchedIndex();
+ recordsToProcess = unmatchedBuildIndexes.size();
+ recordsProcessed = 0;
+ }
- return outputRecords;
+ // Project the list of unmatched records on the build side
+ executeProjectRightPhase();
}
- public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
- @Named("outgoing") RecordBatch outgoing);
- public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
- public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
+ return outputRecords;
+ }
+
+ public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
+ @Named("outgoing") RecordBatch outgoing);
+ public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
+ public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
}
[10/14] git commit: enable multi-phase aggregate by default
Posted by ja...@apache.org.
enable multi-phase aggregate by default
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/4b0d060a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/4b0d060a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/4b0d060a
Branch: refs/heads/master
Commit: 4b0d060a913e8ccb25131c805a8dc0c170ddf191
Parents: 78ae265
Author: Jacques Nadeau <ja...@apache.org>
Authored: Mon May 19 17:31:23 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 17:46:55 2014 -0700
----------------------------------------------------------------------
.../exec/planner/physical/PlannerSettings.java | 38 ++++++++++----------
1 file changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/4b0d060a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index e65ef17..18a32af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -32,10 +32,10 @@ public class PlannerSettings implements FrameworkContext{
public static final OptionValidator EXCHANGE = new BooleanValidator("planner.disable_exchanges", false);
public static final OptionValidator HASHAGG = new BooleanValidator("planner.enable_hashagg", true);
- public static final OptionValidator STREAMAGG = new BooleanValidator("planner.enable_streamagg", true);
- public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);
- public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
- public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", false);
+ public static final OptionValidator STREAMAGG = new BooleanValidator("planner.enable_streamagg", true);
+ public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin", true);
+ public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin", true);
+ public static final OptionValidator MULTIPHASE = new BooleanValidator("planner.enable_multiphase_agg", true);
public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", false);
public OptionManager options = null;
@@ -49,13 +49,13 @@ public class PlannerSettings implements FrameworkContext{
}
public int numEndPoints() {
- return numEndPoints;
+ return numEndPoints;
}
-
+
public boolean useDefaultCosting() {
return useDefaultCosting;
}
-
+
public void setNumEndPoints(int numEndPoints) {
this.numEndPoints = numEndPoints;
}
@@ -63,35 +63,35 @@ public class PlannerSettings implements FrameworkContext{
public void setUseDefaultCosting(boolean defcost) {
this.useDefaultCosting = defcost;
}
-
+
public boolean isHashAggEnabled() {
- return options.getOption(HASHAGG.getOptionName()).bool_val;
+ return options.getOption(HASHAGG.getOptionName()).bool_val;
}
-
+
public boolean isStreamAggEnabled() {
- return options.getOption(STREAMAGG.getOptionName()).bool_val;
+ return options.getOption(STREAMAGG.getOptionName()).bool_val;
}
-
+
public boolean isHashJoinEnabled() {
return options.getOption(HASHJOIN.getOptionName()).bool_val;
}
-
+
public boolean isMergeJoinEnabled() {
- return options.getOption(MERGEJOIN.getOptionName()).bool_val;
+ return options.getOption(MERGEJOIN.getOptionName()).bool_val;
}
-
+
public boolean isMultiPhaseAggEnabled() {
return options.getOption(MULTIPHASE.getOptionName()).bool_val;
}
-
+
public boolean isBroadcastJoinEnabled() {
- return options.getOption(BROADCAST.getOptionName()).bool_val;
+ return options.getOption(BROADCAST.getOptionName()).bool_val;
}
public int getBroadcastThreshold() {
- return broadcastThreshold;
+ return broadcastThreshold;
}
-
+
@Override
public <T> T unwrap(Class<T> clazz) {
if(clazz == PlannerSettings.class){
[12/14] git commit: DRILL-671: Select against hbase table with filter
against row_key fails
Posted by ja...@apache.org.
DRILL-671: Select against hbase table with filter against row_key fails
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6d4dc8fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6d4dc8fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6d4dc8fe
Branch: refs/heads/master
Commit: 6d4dc8fe00d4aea7f7e6a8e67ea404933d326bcf
Parents: 7388150
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Fri May 16 18:18:47 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 18:06:28 2014 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java | 3 ++-
.../src/test/java/org/apache/drill/hbase/BaseHBaseTest.java | 2 +-
2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6d4dc8fe/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
index 0e0ccf5..924cd6e 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
@@ -53,7 +53,8 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
}
public HBaseScanSpec parseTree() {
- return mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), le.accept(this, null));
+ HBaseScanSpec parsedSpec = le.accept(this, null);
+ return parsedSpec != null ? mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), parsedSpec ) : null;
}
public boolean isAllExpressionsConverted() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6d4dc8fe/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index 48193eb..9e07d9f 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -55,7 +55,7 @@ public class BaseHBaseTest extends BaseTestQuery {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
/*
- * Change the following to HBaseTestsSuite.configure(false, false)
+ * Change the following to HBaseTestsSuite.configure(false, true)
* if you want to test against an externally running HBase cluster.
*/
HBaseTestsSuite.configure(true, true);
[06/14] git commit: DRILL-771: Remove explicit cast to varchar for
concat() function; implicit cast will be used.
Posted by ja...@apache.org.
DRILL-771: Remove explicit cast to varchar for concat() function; implicit cast will be used.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/492ec59c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/492ec59c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/492ec59c
Branch: refs/heads/master
Commit: 492ec59c17d718bfb0891a249f9e7a841372fad8
Parents: 62c0b1b
Author: Mehant Baid <me...@gmail.com>
Authored: Sun May 18 19:18:51 2014 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Mon May 19 10:43:31 2014 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/planner/logical/DrillOptiq.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/492ec59c/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
index 7efd714..e900fc2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
@@ -301,16 +301,15 @@ public class DrillOptiq {
// Cast arguments to VARCHAR
List<LogicalExpression> concatArgs = Lists.newArrayList();
- MajorType castType = Types.required(MinorType.VARCHAR).toBuilder().setWidth(64000).build();
- concatArgs.add(FunctionCallFactory.createCast(castType, ExpressionPosition.UNKNOWN, args.get(0)));
- concatArgs.add(FunctionCallFactory.createCast(castType, ExpressionPosition.UNKNOWN, args.get(1)));
+ concatArgs.add(args.get(0));
+ concatArgs.add(args.get(1));
LogicalExpression first = FunctionCallFactory.createExpression(functionName, concatArgs);
for (int i = 2; i < args.size(); i++) {
concatArgs = Lists.newArrayList();
concatArgs.add(first);
- concatArgs.add(FunctionCallFactory.createCast(castType, ExpressionPosition.UNKNOWN, args.get(i)));
+ concatArgs.add(args.get(i));
first = FunctionCallFactory.createExpression(functionName, concatArgs);
}