You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2017/01/08 21:39:55 UTC
[3/3] drill git commit: DRILL-5039: NPE - CTAS PARTITION BY
()
DRILL-5039: NPE - CTAS PARTITION BY (<char-type-column>)
close apache/drill#706
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4d4e0c2b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4d4e0c2b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4d4e0c2b
Branch: refs/heads/master
Commit: 4d4e0c2b23caead69dd4c6c02c07a9800b3c7611
Parents: 15b021f
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Fri Dec 23 17:51:38 2016 +0000
Committer: Aman Sinha <as...@maprtech.com>
Committed: Sun Jan 8 12:28:23 2017 -0800
----------------------------------------------------------------------
.../codegen/templates/NewValueFunctions.java | 73 ++++++-
.../drill/exec/store/NewValueFunction.java | 209 -------------------
.../exec/fn/impl/TestAggregateFunctions.java | 72 ++++---
.../org/apache/drill/exec/sql/TestCTAS.java | 42 +++-
.../resources/parquet/alltypes_optional.parquet | Bin 0 -> 1585 bytes
.../resources/parquet/alltypes_required.parquet | Bin 1197 -> 1585 bytes
6 files changed, 146 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/4d4e0c2b/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java b/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java
index 3b1d86f..5591d66 100644
--- a/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/NewValueFunctions.java
@@ -17,6 +17,12 @@
*/
<@pp.dropOutputFile />
+<#macro reassignHolder>
+ previous.buffer = buf.reallocIfNeeded(length);
+ previous.buffer.setBytes(0, in.buffer, in.start, length);
+ previous.end = length;
+</#macro>
+
<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/GNewValueFunctions.java" />
<#include "/@includes/license.ftl" />
@@ -39,28 +45,51 @@ import org.apache.drill.exec.record.RecordBatch;
*/
public class GNewValueFunctions {
<#list vv.types as type>
-<#if type.major == "Fixed" || type.major = "Bit">
-
<#list type.minor as minor>
<#list vv.modes as mode>
<#if mode.name != "Repeated">
<#if !minor.class.startsWith("Decimal28") && !minor.class.startsWith("Decimal38") && !minor.class.startsWith("Interval")>
@SuppressWarnings("unused")
-@FunctionTemplate(name = "newPartitionValue", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL)
-public static class NewValue${minor.class}${mode.prefix} implements DrillSimpleFunc{
+@FunctionTemplate(name = "newPartitionValue", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL)
+public static class NewValue${minor.class}${mode.prefix} implements DrillSimpleFunc {
@Param ${mode.prefix}${minor.class}Holder in;
@Workspace ${mode.prefix}${minor.class}Holder previous;
@Workspace Boolean initialized;
@Output BitHolder out;
+ <#if type.major == "VarLen">
+ @Inject DrillBuf buf;
+ </#if>
public void setup() {
initialized = false;
+ <#if type.major == "VarLen">
+ previous.buffer = buf;
+ previous.start = 0;
+ </#if>
}
- <#if mode.name == "Required">
public void eval() {
+ <#if mode.name == "Required">
+ <#if type.major == "VarLen">
+ int length = in.end - in.start;
+
+ if (initialized) {
+ if (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(
+ previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0) {
+ out.value = 0;
+ } else {
+ <@reassignHolder/>
+ out.value = 1;
+ }
+ } else {
+ <@reassignHolder/>
+ out.value = 1;
+ initialized = true;
+ }
+ </#if>
+ <#if type.major == "Fixed" || type.major == "Bit">
if (initialized) {
if (in.value == previous.value) {
out.value = 0;
@@ -73,10 +102,36 @@ public static class NewValue${minor.class}${mode.prefix} implements DrillSimpleF
out.value = 1;
initialized = true;
}
- }
</#if>
+ </#if> <#-- mode.name == "Required" -->
+
<#if mode.name == "Optional">
- public void eval() {
+ <#if type.major == "VarLen">
+ int length = in.isSet == 0 ? 0 : in.end - in.start;
+
+ if (initialized) {
+ if (previous.isSet == 0 && in.isSet == 0) {
+ out.value = 0;
+ } else if (previous.isSet != 0 && in.isSet != 0 && org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(
+ previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0) {
+ out.value = 0;
+ } else {
+ if (in.isSet == 1) {
+ <@reassignHolder/>
+ }
+ previous.isSet = in.isSet;
+ out.value = 1;
+ }
+ } else {
+ if (in.isSet == 1) {
+ <@reassignHolder/>
+ }
+ previous.isSet = in.isSet;
+ out.value = 1;
+ initialized = true;
+ }
+ </#if>
+ <#if type.major == "Fixed" || type.major == "Bit">
if (initialized) {
if (in.isSet == 0 && previous.isSet == 0) {
out.value = 0;
@@ -93,14 +148,14 @@ public static class NewValue${minor.class}${mode.prefix} implements DrillSimpleF
out.value = 1;
initialized = true;
}
- }
</#if>
+ </#if> <#-- mode.name == "Optional" -->
+ }
}
</#if> <#-- minor.class.startWith -->
</#if> <#-- mode.name -->
</#list>
</#list>
-</#if> <#-- type.major -->
</#list>
}
http://git-wip-us.apache.org/repos/asf/drill/blob/4d4e0c2b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java
deleted file mode 100644
index fedb473..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/NewValueFunction.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store;
-
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.exec.expr.DrillSimpleFunc;
-import org.apache.drill.exec.expr.annotations.FunctionTemplate;
-import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
-import org.apache.drill.exec.expr.annotations.Output;
-import org.apache.drill.exec.expr.annotations.Param;
-import org.apache.drill.exec.expr.annotations.Workspace;
-import org.apache.drill.exec.expr.holders.BitHolder;
-import org.apache.drill.exec.expr.holders.IntHolder;
-import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder;
-import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
-import org.apache.drill.exec.expr.holders.VarBinaryHolder;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
-
-import javax.inject.Inject;
-
-/**
- * The functions are similar to those created through FreeMarker template for fixed types. There is not much benefit to
- * using code generation for generating the functions for variable length types, so simply doing them by hand.
- */
-public class NewValueFunction {
-
- @FunctionTemplate(name = "newPartitionValue",
- scope = FunctionTemplate.FunctionScope.SIMPLE,
- nulls = NullHandling.INTERNAL)
- public static class NewValueVarChar implements DrillSimpleFunc {
-
- @Param VarCharHolder in;
- @Workspace VarCharHolder previous;
- @Workspace Boolean initialized;
- @Output BitHolder out;
- @Inject DrillBuf buf;
-
- public void setup() {
- initialized = false;
- previous.buffer = buf;
- previous.start = 0;
- }
-
- public void eval() {
- int length = in.end - in.start;
-
- if (initialized) {
- if (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0) {
- out.value = 0;
- } else {
- previous.buffer = buf.reallocIfNeeded(length);
- previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
- previous.end = in.end - in.start;
- out.value = 1;
- }
- } else {
- previous.buffer = buf.reallocIfNeeded(length);
- previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
- previous.end = in.end - in.start;
- out.value = 1;
- initialized = true;
- }
- }
- }
-
- @FunctionTemplate(name = "newPartitionValue",
- scope = FunctionTemplate.FunctionScope.SIMPLE,
- nulls = NullHandling.INTERNAL)
- public static class NewValueVarCharNullable implements DrillSimpleFunc {
-
- @Param NullableVarCharHolder in;
- @Workspace NullableVarCharHolder previous;
- @Workspace Boolean initialized;
- @Output BitHolder out;
- @Inject DrillBuf buf;
-
- public void setup() {
- initialized = false;
- previous.buffer = buf;
- previous.start = 0;
- }
-
- public void eval() {
- int length = in.isSet == 0 ? 0 : in.end - in.start;
-
- if (initialized) {
- if (previous.isSet == 0 && in.isSet == 0 ||
- (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(
- previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0)) {
- out.value = 0;
- } else {
- if (in.isSet == 1) {
- previous.buffer = buf.reallocIfNeeded(length);
- previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
- previous.end = in.end - in.start;
- }
- previous.isSet = in.isSet;
- out.value = 1;
- }
- } else {
- previous.buffer = buf.reallocIfNeeded(length);
- previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
- previous.end = in.end - in.start;
- previous.isSet = 1;
- out.value = 1;
- initialized = true;
- }
- }
- }
-
- @FunctionTemplate(name = "newPartitionValue",
- scope = FunctionTemplate.FunctionScope.SIMPLE,
- nulls = NullHandling.INTERNAL)
- public static class NewValueVarBinary implements DrillSimpleFunc {
-
- @Param VarBinaryHolder in;
- @Workspace VarBinaryHolder previous;
- @Workspace Boolean initialized;
- @Output BitHolder out;
- @Inject DrillBuf buf;
-
- public void setup() {
- initialized = false;
- previous.buffer = buf;
- previous.start = 0;
- }
-
- public void eval() {
- int length = in.end - in.start;
-
- if (initialized) {
- if (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0) {
- out.value = 0;
- } else {
- previous.buffer = buf.reallocIfNeeded(length);
- previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
- previous.end = in.end - in.start;
- out.value = 1;
- }
- } else {
- previous.buffer = buf.reallocIfNeeded(length);
- previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
- previous.end = in.end - in.start;
- out.value = 1;
- initialized = true;
- }
- }
- }
-
- @FunctionTemplate(name = "newPartitionValue",
- scope = FunctionTemplate.FunctionScope.SIMPLE,
- nulls = NullHandling.INTERNAL)
- public static class NewValueVarBinaryNullable implements DrillSimpleFunc {
-
- @Param NullableVarBinaryHolder in;
- @Workspace NullableVarBinaryHolder previous;
- @Workspace Boolean initialized;
- @Output BitHolder out;
- @Inject DrillBuf buf;
-
- public void setup() {
- initialized = false;
- previous.buffer = buf;
- previous.start = 0;
- }
-
- public void eval() {
- int length = in.isSet == 0 ? 0 : in.end - in.start;
-
- if (initialized) {
- if (previous.isSet == 0 && in.isSet == 0 ||
- (org.apache.drill.exec.expr.fn.impl.ByteFunctionHelpers.compare(
- previous.buffer, 0, previous.end, in.buffer, in.start, in.end) == 0)) {
- out.value = 0;
- } else {
- if (in.isSet == 1) {
- previous.buffer = buf.reallocIfNeeded(length);
- previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
- previous.end = in.end - in.start;
- }
- previous.isSet = in.isSet;
- out.value = 1;
- }
- } else {
- previous.buffer = buf.reallocIfNeeded(length);
- previous.buffer.setBytes(0, in.buffer, in.start, in.end - in.start);
- previous.end = in.end - in.start;
- previous.isSet = 1;
- out.value = 1;
- initialized = true;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/4d4e0c2b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index 18071a0..36ee1b9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -25,6 +25,8 @@ import org.apache.drill.PlanTestBase;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.junit.Ignore;
import org.junit.Test;
@@ -285,41 +287,51 @@ public class TestAggregateFunctions extends BaseTestQuery {
}
@Test
- public void minEmptyNonnullableInput() throws Exception {
- // test min function on required type
- String query = "select " +
- "min(bool_col) col1, min(int_col) col2, min(bigint_col) col3, min(float4_col) col4, min(float8_col) col5, " +
- "min(date_col) col6, min(time_col) col7, min(timestamp_col) col8, min(interval_year_col) col9, " +
- "min(varhcar_col) col10 " +
- "from cp.`parquet/alltypes_required.parquet` where 1 = 0";
-
- testBuilder()
- .sqlQuery(query)
- .unOrdered()
- .baselineColumns("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10")
- .baselineValues(null, null, null, null, null, null, null, null, null, null)
- .go();
- }
+ public void minMaxEmptyNonNullableInput() throws Exception {
+ // test min and max functions on required type
+
+ final QueryDataBatch result = testSqlWithResults("select * from cp.`parquet/alltypes_required.parquet` limit 0")
+ .get(0);
+
+ final Map<String, StringBuilder> functions = Maps.newHashMap();
+ functions.put("min", new StringBuilder());
+ functions.put("max", new StringBuilder());
+
+ final Map<String, Object> resultingValues = Maps.newHashMap();
+ for (UserBitShared.SerializedField field : result.getHeader().getDef().getFieldList()) {
+ final String fieldName = field.getNamePart().getName();
+ // Only COUNT aggregate function supported for Boolean type
+ if (fieldName.equals("col_bln")) {
+ continue;
+ }
+ resultingValues.put(String.format("`%s`", fieldName), null);
+ for (Map.Entry<String, StringBuilder> function : functions.entrySet()) {
+ function.getValue()
+ .append(function.getKey())
+ .append("(")
+ .append(fieldName)
+ .append(") ")
+ .append(fieldName)
+ .append(",");
+ }
+ }
+ result.release();
- @Test
- public void maxEmptyNonnullableInput() throws Exception {
+ final String query = "select %s from cp.`parquet/alltypes_required.parquet` where 1 = 0";
+ final List<Map<String, Object>> baselineRecords = Lists.newArrayList();
+ baselineRecords.add(resultingValues);
- // test max function
- final String query = "select " +
- "max(int_col) col1, max(bigint_col) col2, max(float4_col) col3, max(float8_col) col4, " +
- "max(date_col) col5, max(time_col) col6, max(timestamp_col) col7, max(interval_year_col) col8, " +
- "max(varhcar_col) col9 " +
- "from cp.`parquet/alltypes_required.parquet` where 1 = 0";
+ for (StringBuilder selectBody : functions.values()) {
+ selectBody.setLength(selectBody.length() - 1);
- testBuilder()
- .sqlQuery(query)
- .unOrdered()
- .baselineColumns("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9")
- .baselineValues(null, null, null, null, null, null, null, null, null)
- .go();
+ testBuilder()
+ .sqlQuery(query, selectBody.toString())
+ .unOrdered()
+ .baselineRecords(baselineRecords)
+ .go();
+ }
}
-
/*
* Streaming agg on top of a filter produces wrong results if the first two batches are filtered out.
* In the below test we have three files in the input directory and since the ordering of reading
http://git-wip-us.apache.org/repos/asf/drill/blob/4d4e0c2b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java
index 5294709..9d9b403 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java
@@ -17,11 +17,15 @@
*/
package org.apache.drill.exec.sql;
+import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.junit.Test;
import java.io.File;
+import java.util.Map;
public class TestCTAS extends BaseTestQuery {
@Test // DRILL-2589
@@ -125,8 +129,7 @@ public class TestCTAS extends BaseTestQuery {
try {
final String ctasQuery = String.format("CREATE TABLE %s.%s PARTITION BY AS SELECT * from cp.`region.json`", TEMP_SCHEMA, newTblName);
- errorMsgTestHelper(ctasQuery,
- String.format("PARSE ERROR: Encountered \"AS\""));
+ errorMsgTestHelper(ctasQuery,"PARSE ERROR: Encountered \"AS\"");
} finally {
FileUtils.deleteQuietly(new File(getDfsTestTmpSchemaLocation(), newTblName));
}
@@ -238,6 +241,41 @@ public class TestCTAS extends BaseTestQuery {
}
}
+ @Test
+ public void testPartitionByForAllTypes() throws Exception {
+ final String location = "partitioned_tables_with_nulls";
+ final String ctasQuery = "create table %s partition by (%s) as %s";
+ final String tablePath = "%s.`%s/%s_%s`";
+
+ // key - new table suffix, value - data query
+ final Map<String, String> variations = Maps.newHashMap();
+ variations.put("required", "select * from cp.`parquet/alltypes_required.parquet`");
+ variations.put("optional", "select * from cp.`parquet/alltypes_optional.parquet`");
+ variations.put("nulls_only", "select * from cp.`parquet/alltypes_optional.parquet` where %s is null");
+
+ try {
+ final QueryDataBatch result = testSqlWithResults("select * from cp.`parquet/alltypes_required.parquet` limit 0").get(0);
+ for (UserBitShared.SerializedField field : result.getHeader().getDef().getFieldList()) {
+ final String fieldName = field.getNamePart().getName();
+
+ for (Map.Entry<String, String> variation : variations.entrySet()) {
+ final String table = String.format(tablePath, TEMP_SCHEMA, location, fieldName, variation.getKey());
+ final String dataQuery = String.format(variation.getValue(), fieldName);
+ test(ctasQuery, table, fieldName, dataQuery, fieldName);
+ testBuilder()
+ .sqlQuery("select * from %s", table)
+ .unOrdered()
+ .sqlBaselineQuery(dataQuery)
+ .build()
+ .run();
+ }
+ }
+ result.release();
+ } finally {
+ FileUtils.deleteQuietly(new File(getDfsTestTmpSchemaLocation(), location));
+ }
+ }
+
private static void ctasErrorTestHelper(final String ctasSql, final String expErrorMsg) throws Exception {
final String createTableSql = String.format(ctasSql, TEMP_SCHEMA, "testTableName");
errorMsgTestHelper(createTableSql, expErrorMsg);
http://git-wip-us.apache.org/repos/asf/drill/blob/4d4e0c2b/exec/java-exec/src/test/resources/parquet/alltypes_optional.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/alltypes_optional.parquet b/exec/java-exec/src/test/resources/parquet/alltypes_optional.parquet
new file mode 100644
index 0000000..53f5fa1
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/alltypes_optional.parquet differ
http://git-wip-us.apache.org/repos/asf/drill/blob/4d4e0c2b/exec/java-exec/src/test/resources/parquet/alltypes_required.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/alltypes_required.parquet b/exec/java-exec/src/test/resources/parquet/alltypes_required.parquet
index 549e316..efc6add 100644
Binary files a/exec/java-exec/src/test/resources/parquet/alltypes_required.parquet and b/exec/java-exec/src/test/resources/parquet/alltypes_required.parquet differ