You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/18 06:22:12 UTC
[02/13] git commit: DRILL-836: [addendum] Drill needs to return
complex types (e.g., map and array) as a JSON string
DRILL-836: [addendum] Drill needs to return complex types (e.g., map and array) as a JSON string
* This contains additional changes to the original patch which was merged.
+ Renamed "flatten" to "complex-to-json"
+ With the new patch, we return VARCHAR instead of VARBINARY.
+ Added test case.
+ Minor code re-factoring.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2e07b0b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2e07b0b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2e07b0b8
Branch: refs/heads/master
Commit: 2e07b0b8ab4fe7fdb1ad51ee47ab10aeec94b7f9
Parents: 4d17aea
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Mon Jun 16 15:01:11 2014 -0700
Committer: Aditya Kishore <ad...@maprtech.com>
Committed: Tue Jun 17 00:06:41 2014 -0700
----------------------------------------------------------------------
.gitignore | 1 +
.../drill/common/util/DrillStringUtils.java | 176 +++++++++++++++++++
.../org/apache/drill/hbase/BaseHBaseTest.java | 28 ++-
.../drill/hbase/HBaseRecordReaderTest.java | 6 +-
.../org/apache/drill/hbase/HBaseTestsSuite.java | 6 +-
.../drill/hbase/TestHBaseCFAsJSONString.java | 54 ++++++
.../drill/hbase/TestHBaseFilterPushDown.java | 12 +-
.../drill/hbase/TestHBaseProjectPushDown.java | 8 +-
.../resources/bootstrap-storage-plugins.json | 1 +
.../src/main/codegen/includes/vv_imports.ftl | 1 +
.../main/codegen/templates/SqlAccessors.java | 2 +-
.../apache/drill/exec/client/DrillClient.java | 16 +-
.../exec/expr/fn/impl/StringFunctionUtil.java | 71 --------
.../exec/expr/fn/impl/StringFunctions.java | 2 +-
.../exec/physical/config/ComplexToJson.java | 59 +++++++
.../drill/exec/physical/config/Flatten.java | 59 -------
.../impl/project/ComplexToJsonBatchCreator.java | 42 +++++
.../impl/project/FlattenBatchCreator.java | 42 -----
.../impl/project/ProjectBatchCreator.java | 3 +-
.../impl/project/ProjectRecordBatch.java | 62 ++++---
.../planner/fragment/SimpleParallelizer.java | 6 +-
.../planner/physical/ComplexToJsonPrel.java | 69 ++++++++
.../exec/planner/physical/FlattenPrel.java | 61 -------
.../visitor/ComplexToJsonPrelVisitor.java | 40 +++++
.../physical/visitor/FlattenPrelVisitor.java | 40 -----
.../planner/sql/handlers/DefaultSqlHandler.java | 6 +-
.../org/apache/drill/exec/rpc/BasicClient.java | 4 +-
.../org/apache/drill/exec/rpc/RpcException.java | 4 +-
.../apache/drill/exec/rpc/user/UserClient.java | 20 ++-
.../apache/drill/exec/rpc/user/UserServer.java | 8 +-
.../apache/drill/exec/rpc/user/UserSession.java | 65 +++++--
.../org/apache/drill/exec/util/ConvertUtil.java | 7 +-
.../org/apache/drill/exec/util/VectorUtil.java | 21 +--
.../java/org/apache/drill/PlanningBase.java | 5 +-
.../exec/physical/impl/TestOptiqPlans.java | 2 +-
exec/jdbc/pom.xml | 1 +
.../apache/drill/exec/proto/UserBitShared.java | 18 +-
.../exec/proto/beans/CoreOperatorType.java | 4 +-
protocol/src/main/protobuf/UserBitShared.proto | 2 +-
39 files changed, 645 insertions(+), 389 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 1092a7d..838ea6b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,7 @@
.project
.buildpath
.classpath
+.checkstyle
.settings/
.idea/
*.log
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java b/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java
new file mode 100644
index 0000000..96b5776
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.util;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+
+import io.netty.buffer.ByteBuf;
+
+public class DrillStringUtils {
+ /**
+ * Unescapes any Java literals found in the {@code String}.
+ * For example, it will turn a sequence of {@code '\'} and
+ * {@code 'n'} into a newline character, unless the {@code '\'}
+ * is preceded by another {@code '\'}.
+ *
+ * @param input the {@code String} to unescape, may be null
+ * @return a new unescaped {@code String}, {@code null} if null string input
+ */
+ public static final String unescapeJava(String input) {
+ return StringEscapeUtils.unescapeJava(input);
+ }
+
+ /**
+ * Escapes the characters in a {@code String} using Java String rules.
+ *
+ * Deals correctly with quotes and control-chars (tab, backslash, cr, ff, etc.)
+ *
+ * So a tab becomes the characters {@code '\\'} and
+ * {@code 't'}.
+ *
+ * Example:
+ * <pre>
+ * input string: He didn't say, "Stop!"
+ * output string: He didn't say, \"Stop!\"
+ * </pre>
+ *
+ * @param input String to escape values in, may be null
+ * @return String with escaped values, {@code null} if null string input
+ */
+ public static final String escapeJava(String input) {
+ return StringEscapeUtils.escapeJava(input);
+ }
+
+ public static final String escapeNewLines(String input) {
+ if (input == null) {
+ return null;
+ }
+ StringBuilder result = new StringBuilder();
+ boolean sawNewline = false;
+ for (int i = 0; i < input.length(); i++) {
+ char curChar = input.charAt(i);
+ if (curChar == '\r' || curChar == '\n') {
+ if (sawNewline) {
+ continue;
+ }
+ sawNewline = true;
+ result.append("\\n");
+ } else {
+ sawNewline = false;
+ result.append(curChar);
+ }
+ }
+ return result.toString();
+ }
+
+ /**
+ * Return a printable representation of a byte buffer, escaping the non-printable
+ * bytes as '\\xNN' where NN is the hexadecimal representation of such bytes.
+ *
+ * This function does not modify the {@code readerIndex} and {@code writerIndex}
+ * of the byte buffer.
+ */
+ public static String toBinaryString(ByteBuf buf, int strStart, int strEnd) {
+ StringBuilder result = new StringBuilder();
+ for (int i = strStart; i < strEnd ; ++i) {
+ appendByte(result, buf.getByte(i));
+ }
+ return result.toString();
+ }
+
+ /**
+ * Return a printable representation of a byte array, escaping the non-printable
+ * bytes as '\\xNN' where NN is the hexadecimal representation of such bytes.
+ */
+ public static String toBinaryString(byte[] buf) {
+ return toBinaryString(buf, 0, buf.length);
+ }
+
+ /**
+ * Return a printable representation of a byte array, escaping the non-printable
+ * bytes as '\\xNN' where NN is the hexadecimal representation of such bytes.
+ */
+ public static String toBinaryString(byte[] buf, int strStart, int strEnd) {
+ StringBuilder result = new StringBuilder();
+ for (int i = strStart; i < strEnd ; ++i) {
+ appendByte(result, buf[i]);
+ }
+ return result.toString();
+ }
+
+ private static void appendByte(StringBuilder result, byte b) {
+ int ch = b & 0xFF;
+ if ( (ch >= '0' && ch <= '9')
+ || (ch >= 'A' && ch <= 'Z')
+ || (ch >= 'a' && ch <= 'z')
+ || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0 ) {
+ result.append((char)ch);
+ } else {
+ result.append(String.format("\\x%02X", ch));
+ }
+ }
+
+ /**
+ * In-place parsing of a hex encoded binary string.
+ *
+ * This function does not modify the {@code readerIndex} and {@code writerIndex}
+ * of the byte buffer.
+ *
+ * @return Index in the byte buffer just after the last written byte.
+ */
+ public static int parseBinaryString(ByteBuf str, int strStart, int strEnd) {
+ int length = (strEnd - strStart);
+ int dstEnd = strStart;
+ for (int i = strStart; i < length ; i++) {
+ byte b = str.getByte(i);
+ if (b == '\\'
+ && length > i+3
+ && (str.getByte(i+1) == 'x' || str.getByte(i+1) == 'X')) {
+ // ok, take next 2 hex digits.
+ byte hd1 = str.getByte(i+2);
+ byte hd2 = str.getByte(i+3);
+ if (isHexDigit(hd1) && isHexDigit(hd2)) { // [a-fA-F0-9]
+ // turn hex ASCII digit -> number
+ b = (byte) ((toBinaryFromHex(hd1) << 4) + toBinaryFromHex(hd2));
+ i += 3; // skip 3
+ }
+ }
+ str.setByte(dstEnd++, b);
+ }
+ return dstEnd;
+ }
+
+ /**
+ * Takes a ASCII digit in the range A-F0-9 and returns
+ * the corresponding integer/ordinal value.
+ * @param ch The hex digit.
+ * @return The converted hex value as a byte.
+ */
+ private static byte toBinaryFromHex(byte ch) {
+ if ( ch >= 'A' && ch <= 'F' )
+ return (byte) ((byte)10 + (byte) (ch - 'A'));
+ else if ( ch >= 'a' && ch <= 'f' )
+ return (byte) ((byte)10 + (byte) (ch - 'a'));
+ return (byte) (ch - '0');
+ }
+
+ private static boolean isHexDigit(byte c) {
+ return (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F') || (c >= '0' && c <= '9');
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index dbeced3..e6a5474 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -42,6 +42,8 @@ import com.google.common.io.Files;
public class BaseHBaseTest extends BaseTestQuery {
+ private static final String HBASE_STORAGE_PLUGIN_NAME = "hbase";
+
protected static Configuration conf = HBaseConfiguration.create();
protected static HBaseStoragePlugin storagePlugin;
@@ -66,10 +68,11 @@ public class BaseHBaseTest extends BaseTestQuery {
HBaseTestsSuite.configure(true, true);
HBaseTestsSuite.initCluster();
- storagePlugin = (HBaseStoragePlugin) bit.getContext().getStorage().getPlugin("hbase");
+ storagePlugin = (HBaseStoragePlugin) bit.getContext().getStorage().getPlugin(HBASE_STORAGE_PLUGIN_NAME);
storagePluginConfig = storagePlugin.getConfig();
-
+ storagePluginConfig.setEnabled(true);
storagePluginConfig.setZookeeperPort(HBaseTestsSuite.getZookeeperPort());
+ bit.getContext().getStorage().createOrUpdate(HBASE_STORAGE_PLUGIN_NAME, storagePluginConfig, true);
}
@AfterClass
@@ -91,20 +94,24 @@ public class BaseHBaseTest extends BaseTestQuery {
.replace("[TABLE_NAME]", tableName);
}
- protected void runPhysicalVerifyCount(String planFile, String tableName, int expectedRowCount) throws Exception{
+ protected void runHBasePhysicalVerifyCount(String planFile, String tableName, int expectedRowCount) throws Exception{
String physicalPlan = getPlanText(planFile, tableName);
List<QueryResultBatch> results = testPhysicalWithResults(physicalPlan);
printResultAndVerifyRowCount(results, expectedRowCount);
}
- protected void runSQLVerifyCount(String sql, int expectedRowCount) throws Exception{
- sql = canonizeSQL(sql);
+ protected List<QueryResultBatch> runHBaseSQLlWithResults(String sql) throws Exception {
+ sql = canonizeHBaseSQL(sql);
System.out.println("Running query:\n" + sql);
- List<QueryResultBatch> results = testSqlWithResults(sql);
+ return testSqlWithResults(sql);
+ }
+
+ protected void runHBaseSQLVerifyCount(String sql, int expectedRowCount) throws Exception{
+ List<QueryResultBatch> results = runHBaseSQLlWithResults(sql);
printResultAndVerifyRowCount(results, expectedRowCount);
}
- private void printResultAndVerifyRowCount(List<QueryResultBatch> results, int expectedRowCount) throws SchemaChangeException {
+ protected int printResult(List<QueryResultBatch> results) throws SchemaChangeException {
int rowCount = 0;
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
for(QueryResultBatch result : results){
@@ -118,12 +125,17 @@ public class BaseHBaseTest extends BaseTestQuery {
result.release();
}
System.out.println("Total record count: " + rowCount);
+ return rowCount;
+ }
+
+ private void printResultAndVerifyRowCount(List<QueryResultBatch> results, int expectedRowCount) throws SchemaChangeException {
+ int rowCount = printResult(results);
if (expectedRowCount != -1) {
Assert.assertEquals(expectedRowCount, rowCount);
}
}
- protected String canonizeSQL(String sql) {
+ protected String canonizeHBaseSQL(String sql) {
return sql.replace("[TABLE_NAME]", HBaseTestsSuite.TEST_TABLE_1);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
index 1462b81..60db266 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
@@ -24,19 +24,19 @@ public class HBaseRecordReaderTest extends BaseHBaseTest {
@Test
public void testLocalDistributed() throws Exception {
String planName = "/hbase/hbase_scan_screen_physical.json";
- runPhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 6);
+ runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 6);
}
@Test
public void testLocalDistributedColumnSelect() throws Exception {
String planName = "/hbase/hbase_scan_screen_physical_column_select.json";
- runPhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 2);
+ runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 2);
}
@Test
public void testLocalDistributedFamilySelect() throws Exception {
String planName = "/hbase/hbase_scan_screen_physical_family_select.json";
- runPhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 3);
+ runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 3);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index a24215d..18cf87c 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -38,7 +38,8 @@ import org.junit.runners.Suite.SuiteClasses;
TestHBaseFilterPushDown.class,
TestHBaseProjectPushDown.class,
TestHBaseRegionScanAssignments.class,
- TestHBaseTableProvider.class
+ TestHBaseTableProvider.class,
+ TestHBaseCFAsJSONString.class
})
public class HBaseTestsSuite {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseTestsSuite.class);
@@ -55,6 +56,9 @@ public class HBaseTestsSuite {
private static volatile AtomicInteger initCount = new AtomicInteger(0);
+ /**
+ * This flag controls whether {@link HBaseTestsSuite} starts a mini HBase cluster to run the unit test.
+ */
private static boolean manageHBaseCluster = System.getProperty("drill.hbase.tests.manageHBaseCluster", "true").equalsIgnoreCase("true");
private static boolean hbaseClusterCreated = false;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
new file mode 100644
index 0000000..9cc0356
--- /dev/null
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.hbase;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHBaseCFAsJSONString extends BaseHBaseTest {
+
+ private static DrillClient parent_client;
+
+ @BeforeClass
+ public static void openMyClient() throws Exception {
+ parent_client = client;
+ client = new DrillClient(config, serviceSet.getCoordinator());
+ client.setSupportComplexTypes(false);
+ client.connect();
+ }
+
+ @AfterClass
+ public static void closeClient() throws IOException {
+ if(client != null) client.close();
+ client = parent_client;
+ }
+
+ @Test
+ public void testColumnFamiliesAsJSONString() throws Exception {
+ setColumnWidths(new int[] {112, 12});
+ List<QueryResultBatch> resultList = runHBaseSQLlWithResults("SELECT f, f2 FROM hbase.`[TABLE_NAME]` tableName LIMIT 1");
+ printResult(resultList);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 50dddeb..29e7033 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -24,7 +24,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
@Test
public void testFilterPushDownRowKeyEqual() throws Exception {
setColumnWidths(new int[] {8, 38, 38});
- runSQLVerifyCount("SELECT\n"
+ runHBaseSQLVerifyCount("SELECT\n"
+ " *\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName\n"
@@ -36,7 +36,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
@Test
public void testFilterPushDownRowKeyGreaterThan() throws Exception {
setColumnWidths(new int[] {8, 38, 38});
- runSQLVerifyCount("SELECT\n"
+ runHBaseSQLVerifyCount("SELECT\n"
+ " *\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName\n"
@@ -48,7 +48,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
@Test
public void testFilterPushDownRowKeyBetween() throws Exception {
setColumnWidths(new int[] {8, 74, 38});
- runSQLVerifyCount("SELECT\n"
+ runHBaseSQLVerifyCount("SELECT\n"
+ " *\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName\n"
@@ -60,7 +60,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
@Test
public void testFilterPushDownMultiColumns() throws Exception {
setColumnWidths(new int[] {8, 74, 38});
- runSQLVerifyCount("SELECT\n"
+ runHBaseSQLVerifyCount("SELECT\n"
+ " *\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` t\n"
@@ -72,7 +72,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
@Test
public void testFilterPushDownConvertExpression() throws Exception {
setColumnWidths(new int[] {8, 38, 38});
- runSQLVerifyCount("SELECT\n"
+ runHBaseSQLVerifyCount("SELECT\n"
+ " *\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName\n"
@@ -84,7 +84,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
@Test
public void testFilterPushDownRowKeyLessThanOrEqualTo() throws Exception {
setColumnWidths(new int[] {8, 74, 38});
- runSQLVerifyCount("SELECT\n"
+ runHBaseSQLVerifyCount("SELECT\n"
+ " *\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName\n"
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
index ce6f865..6efe905 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
@@ -24,7 +24,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
@Test
public void testRowKeyPushDown() throws Exception{
setColumnWidth(8);
- runSQLVerifyCount("SELECT\n"
+ runHBaseSQLVerifyCount("SELECT\n"
+ "row_key\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName"
@@ -34,7 +34,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
@Test
public void testColumnWith1RowPushDown() throws Exception{
setColumnWidth(6);
- runSQLVerifyCount("SELECT\n"
+ runHBaseSQLVerifyCount("SELECT\n"
+ "t.f2.c7 as `t.f2.c7`\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` t"
@@ -44,7 +44,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
@Test
public void testRowKeyAndColumnPushDown() throws Exception{
setColumnWidths(new int[] {8, 9, 6, 2, 6});
- runSQLVerifyCount("SELECT\n"
+ runHBaseSQLVerifyCount("SELECT\n"
+ "row_key, t.f.c1*31 as `t.f.c1*31`, t.f.c2 as `t.f.c2`, 5 as `5`, 'abc' as `'abc'`\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` t"
@@ -54,7 +54,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
@Test
public void testColumnFamilyPushDown() throws Exception{
setColumnWidths(new int[] {8, 74, 38});
- runSQLVerifyCount("SELECT\n"
+ runHBaseSQLVerifyCount("SELECT\n"
+ "row_key, f, f2\n"
+ "FROM\n"
+ " hbase.`[TABLE_NAME]` tableName"
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
index 0e93f7e..3e0e8c0 100644
--- a/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
@@ -2,6 +2,7 @@
"storage":{
hbase : {
type:"hbase",
+ enabled: false,
config : {
"hbase.zookeeper.quorum" : "localhost",
"hbase.zookeeper.property.clientPort" : 2181
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
index 9e16116..872c0b8 100644
--- a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
+++ b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
@@ -33,6 +33,7 @@ import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.types.TypeProtos.*;
import org.apache.drill.common.types.Types;
+import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.exec.vector.complex.*;
import org.apache.drill.exec.vector.complex.reader.*;
import org.apache.drill.exec.vector.complex.impl.*;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/SqlAccessors.java b/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
index d05b7fd..a5b251e 100644
--- a/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
+++ b/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
@@ -69,7 +69,7 @@ public class ${name}Accessor extends AbstractSqlAccessor{
<#case "VarBinary">
public String getString(int index) {
byte [] b = ac.get(index);
- return StringFunctionUtil.toBinaryString(io.netty.buffer.Unpooled.wrappedBuffer(b), 0, b.length);
+ return DrillStringUtils.toBinaryString(io.netty.buffer.Unpooled.wrappedBuffer(b), 0, b.length);
}
<#break>
<#case "VarChar">
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 7dc7702..7d7f2ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -72,6 +72,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
private final BufferAllocator allocator;
private int reconnectTimes;
private int reconnectDelay;
+ private boolean supportComplexTypes = true;
private final boolean ownsZkConnection;
private final boolean ownsAllocator;
@@ -110,7 +111,18 @@ public class DrillClient implements Closeable, ConnectionThrottle{
client.setAutoRead(enableAutoRead);
}
-
+ /**
+ * Sets whether the application is willing to accept complex types (Map, Arrays) in the returned result set.
+ * Default is {@code true}. If set to {@code false}, the complex types are returned as JSON encoded VARCHAR type.
+ *
+ * @throws IllegalStateException if called after a connection has been established.
+ */
+ public void setSupportComplexTypes(boolean supportComplexTypes) {
+ if (connected) {
+ throw new IllegalStateException("Attempted to modify client connection property after connection has been established.");
+ }
+ this.supportComplexTypes = supportComplexTypes;
+ }
/**
* Connects the client to a Drillbit server
@@ -181,7 +193,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
private void connect(DrillbitEndpoint endpoint) throws RpcException {
FutureHandler f = new FutureHandler();
try {
- client.connect(f, endpoint, props);
+ client.setSupportComplexTypes(supportComplexTypes).connect(f, endpoint, props);
f.checkedGet();
} catch (InterruptedException e) {
throw new RpcException(e);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
index fbdab8e..844a3e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
@@ -72,77 +72,6 @@ public class StringFunctionUtil {
return -1;
}
- /**
- * Return a printable representation of a byte buffer, escaping the non-printable
- * bytes as '\\xNN' where NN is the hexadecimal representation of such bytes.
- *
- * This function does not modify the {@code readerIndex} and {@code writerIndex}
- * of the byte buffer.
- */
- public static String toBinaryString(ByteBuf buf, int strStart, int strEnd) {
- StringBuilder result = new StringBuilder();
- for (int i = strStart; i < strEnd ; ++i) {
- int ch = buf.getByte(i) & 0xFF;
- if ( (ch >= '0' && ch <= '9')
- || (ch >= 'A' && ch <= 'Z')
- || (ch >= 'a' && ch <= 'z')
- || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0 ) {
- result.append((char)ch);
- } else {
- result.append(String.format("\\x%02X", ch));
- }
- }
- return result.toString();
- }
-
- /**
- * In-place parsing of a hex encoded binary string.
- *
- * This function does not modify the {@code readerIndex} and {@code writerIndex}
- * of the byte buffer.
- *
- * @return Index in the byte buffer just after the last written byte.
- */
- public static int parseBinaryString(ByteBuf str, int strStart, int strEnd) {
- int length = (strEnd - strStart);
- int dstEnd = strStart;
- for (int i = strStart; i < length ; i++) {
- byte b = str.getByte(i);
- if (b == '\\'
- && length > i+3
- && (str.getByte(i+1) == 'x' || str.getByte(i+1) == 'X')) {
- // ok, take next 2 hex digits.
- byte hd1 = str.getByte(i+2);
- byte hd2 = str.getByte(i+3);
- if (isHexDigit(hd1) && isHexDigit(hd2)) { // [a-fA-F0-9]
- // turn hex ASCII digit -> number
- b = (byte) ((toBinaryFromHex(hd1) << 4) + toBinaryFromHex(hd2));
- i += 3; // skip 3
- }
- }
- str.setByte(dstEnd++, b);
- }
- return dstEnd;
- }
-
- /**
- * Takes a ASCII digit in the range A-F0-9 and returns
- * the corresponding integer/ordinal value.
- * @param ch The hex digit.
- * @return The converted hex value as a byte.
- */
- private static byte toBinaryFromHex(byte ch) {
- if ( ch >= 'A' && ch <= 'F' )
- return (byte) ((byte)10 + (byte) (ch - 'A'));
- else if ( ch >= 'a' && ch <= 'f' )
- return (byte) ((byte)10 + (byte) (ch - 'a'));
- return (byte) (ch - '0');
- }
-
- private static boolean isHexDigit(byte c) {
- return (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F') || (c >= '0' && c <= '9');
- }
-
private static int utf8CharLen(ByteBuf buffer, int idx) {
byte firstByte = buffer.getByte(idx);
if (firstByte >= 0) { // 1-byte char. First byte is 0xxxxxxx.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
index 51a7dbb..33f2c94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
@@ -928,7 +928,7 @@ public class StringFunctions{
public void eval() {
out.buffer = in.buffer;
out.start = in.start;
- out.end = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.parseBinaryString(in.buffer, in.start, in.end);
+ out.end = org.apache.drill.common.util.DrillStringUtils.parseBinaryString(in.buffer, in.start, in.end);
out.buffer.readerIndex(out.start);
out.buffer.writerIndex(out.end);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ComplexToJson.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ComplexToJson.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ComplexToJson.java
new file mode 100644
index 0000000..480f84e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ComplexToJson.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("complex-to-json")
+public class ComplexToJson extends AbstractSingle {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexToJson.class);
+
+ @JsonCreator
+ public ComplexToJson(@JsonProperty("child") PhysicalOperator child) {
+ super(child);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+ return physicalVisitor.visitOp(this, value);
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new ComplexToJson(child);
+ }
+
+ @Override
+ public SelectionVectorMode getSVMode() {
+ return child.getSVMode();
+ }
+
+ @Override
+ public int getOperatorType() {
+ return CoreOperatorType.COMPLEX_TO_JSON_VALUE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java
deleted file mode 100644
index d123d2b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.config;
-
-import org.apache.drill.exec.physical.base.AbstractSingle;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("flatten")
-public class Flatten extends AbstractSingle {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Flatten.class);
-
- @JsonCreator
- public Flatten(@JsonProperty("child") PhysicalOperator child) {
- super(child);
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
- return physicalVisitor.visitOp(this, value);
- }
-
- @Override
- protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new Flatten(child);
- }
-
- @Override
- public SelectionVectorMode getSVMode() {
- return child.getSVMode();
- }
-
- @Override
- public int getOperatorType() {
- return CoreOperatorType.FLATTEN_VALUE;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
new file mode 100644
index 0000000..0df9491
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.project;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.ComplexToJson;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+public class ComplexToJsonBatchCreator implements BatchCreator<ComplexToJson> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexToJsonBatchCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, ComplexToJson flatten, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.size() == 1);
+ return new ProjectRecordBatch(new Project(null, flatten.getChild()),
+ children.iterator().next(),
+ context);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java
deleted file mode 100644
index 9bea73c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.project;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.Flatten;
-import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.record.RecordBatch;
-
-import com.google.common.base.Preconditions;
-
-public class FlattenBatchCreator implements BatchCreator<Flatten> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class);
-
- @Override
- public RecordBatch getBatch(FragmentContext context, Flatten flatten, List<RecordBatch> children) throws ExecutionSetupException {
- Preconditions.checkArgument(children.size() == 1);
- return new ProjectRecordBatch(new Project(null, flatten.getChild()),
- children.iterator().next(),
- context);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
index 929071d..cb1d4f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
@@ -35,6 +35,5 @@ public class ProjectBatchCreator implements BatchCreator<Project>{
Preconditions.checkArgument(children.size() == 1);
return new ProjectRecordBatch(config, children.iterator().next(), context);
}
-
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index f5a4444..5ee01f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -25,12 +25,16 @@ import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.FunctionCallFactory;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.fn.CastFunctions;
import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -98,9 +102,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
protected void doWork() {
// VectorUtil.showVectorAccessibleContent(incoming, ",");
int incomingRecordCount = incoming.getRecordCount();
-
+
doAlloc();
-
+
int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
if (outputRecords < incomingRecordCount) {
setValueCount(outputRecords);
@@ -114,8 +118,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
this.recordCount = outputRecords;
}
- // In case of complex writer expression, vectors would be added to batch run-time.
- // We have to re-build the schema.
+ // In case of complex writer expression, vectors would be added to batch run-time.
+ // We have to re-build the schema.
if (complexWriters != null) {
container.buildSchema(SelectionVectorMode.NONE);
}
@@ -138,17 +142,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
this.recordCount = remainingRecordCount;
}
- // In case of complex writer expression, vectors would be added to batch run-time.
- // We have to re-build the schema.
+ // In case of complex writer expression, vectors would be added to batch run-time.
+ // We have to re-build the schema.
if (complexWriters != null) {
container.buildSchema(SelectionVectorMode.NONE);
- }
+ }
}
public void addComplexWriter(ComplexWriter writer) {
complexWriters.add(writer);
}
-
+
private boolean doAlloc() {
//Allocate vv in the allocationVectors.
for(ValueVector v : this.allocationVectors){
@@ -156,17 +160,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
if (!v.allocateNewSafe())
return false;
}
-
+
//Allocate vv for complexWriters.
if (complexWriters == null)
return true;
-
+
for (ComplexWriter writer : complexWriters)
writer.allocate();
-
+
return true;
}
-
+
private void setValueCount(int count) {
for(ValueVector v : allocationVectors){
ValueVector.Mutator m = v.getMutator();
@@ -177,9 +181,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
return;
for (ComplexWriter writer : complexWriters)
- writer.setValueCount(count);
+ writer.setValueCount(count);
}
-
+
/** hack to make ref and full work together... need to figure out if this is still necessary. **/
private FieldReference getRef(NamedExpression e){
FieldReference ref = e.getRef();
@@ -259,16 +263,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
container.add(tp.getTo());
transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
// logger.debug("Added transfer.");
- } else if (expr instanceof DrillFuncHolderExpr &&
- ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) {
- // Need to process ComplexWriter function evaluation.
- // Lazy initialization of the list of complex writers, if not done yet.
+ } else if (expr instanceof DrillFuncHolderExpr &&
+ ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) {
+ // Need to process ComplexWriter function evaluation.
+ // Lazy initialization of the list of complex writers, if not done yet.
if (complexWriters == null)
complexWriters = Lists.newArrayList();
-
- // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
+
+ // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef());
- cg.addExpr(expr);
+ cg.addExpr(expr);
} else{
// need to do evaluation.
ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -303,10 +307,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
List<NamedExpression> exprs = Lists.newArrayList();
for (MaterializedField field : incoming.getSchema()) {
if (Types.isComplex(field.getType())) {
- exprs.add(new NamedExpression(
- FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON", field.getPath(), ExpressionPosition.UNKNOWN),
- new FieldReference(field.getPath()))
- );
+ LogicalExpression convertToJson = FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON", field.getPath(), ExpressionPosition.UNKNOWN);
+ String castFuncName = CastFunctions.getCastFunc(MinorType.VARCHAR);
+ List<LogicalExpression> castArgs = Lists.newArrayList();
+ castArgs.add(convertToJson); //input_expr
+ /*
+ * We are implicitly casting to VARCHAR so we don't have a max length,
+ * using an arbitrary value. We trim down the size of the stored bytes
+ * to the actual size so this size doesn't really matter.
+ */
+ castArgs.add(new ValueExpressions.LongExpression(65536, null)); //
+ FunctionCall castCall = new FunctionCall(castFuncName, castArgs, ExpressionPosition.UNKNOWN);
+ exprs.add(new NamedExpression(castCall, new FieldReference(field.getPath())));
} else {
exprs.add(new NamedExpression(field.getPath(), new FieldReference(field.getPath())));
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 053580b..0ce480d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -20,9 +20,9 @@ package org.apache.drill.exec.planner.fragment;
import java.util.Collection;
import java.util.List;
-import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.expr.fn.impl.DateUtility;
@@ -158,11 +158,11 @@ public class SimpleParallelizer {
.build();
if (isRootNode) {
- logger.debug("Root fragment:\n {}", StringEscapeUtils.unescapeJava(fragment.toString()));
+ logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
rootFragment = fragment;
rootOperator = root;
} else {
- logger.debug("Remote fragment:\n {}", StringEscapeUtils.unescapeJava(fragment.toString()));
+ logger.debug("Remote fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
fragments.add(fragment);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ComplexToJsonPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ComplexToJsonPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ComplexToJsonPrel.java
new file mode 100644
index 0000000..b6bedb6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ComplexToJsonPrel.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.ComplexToJson;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SingleRel;
+import org.eigenbase.relopt.RelTraitSet;
+
+public class ComplexToJsonPrel extends SingleRel implements Prel {
+
+ public ComplexToJsonPrel(Prel phyRelNode) {
+ super(phyRelNode.getCluster(), phyRelNode.getTraitSet(), phyRelNode);
+ }
+
+ @Override
+ public final RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new ComplexToJsonPrel((Prel) sole(inputs));
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+ ComplexToJson p = new ComplexToJson(((Prel) getChild()).getPhysicalOperator(creator));
+ return creator.addMetadata(this, p);
+ }
+
+ @Override
+ public Iterator<Prel> iterator() {
+ return PrelUtil.iter(getChild());
+ }
+
+ @Override
+ public SelectionVectorMode[] getSupportedEncodings() {
+ return SelectionVectorMode.DEFAULT;
+ }
+
+ @Override
+ public SelectionVectorMode getEncoding() {
+ return SelectionVectorMode.NONE;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+ return logicalVisitor.visitPrel(this, value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java
deleted file mode 100644
index 3c19bae..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.physical;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.Flatten;
-import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.eigenbase.rel.SingleRel;
-
-public class FlattenPrel extends SingleRel implements Prel {
-
- public FlattenPrel(Prel phyRelNode) {
- super(phyRelNode.getCluster(), phyRelNode.getTraitSet(), phyRelNode);
- }
-
- @Override
- public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
- Flatten p = new Flatten(((Prel) getChild()).getPhysicalOperator(creator));
- return creator.addMetadata(this, p);
- }
-
- @Override
- public Iterator<Prel> iterator() {
- return PrelUtil.iter(getChild());
- }
-
- @Override
- public SelectionVectorMode[] getSupportedEncodings() {
- return SelectionVectorMode.DEFAULT;
- }
-
- @Override
- public SelectionVectorMode getEncoding() {
- return SelectionVectorMode.NONE;
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
- return logicalVisitor.visitPrel(this, value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ComplexToJsonPrelVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ComplexToJsonPrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ComplexToJsonPrelVisitor.java
new file mode 100644
index 0000000..37b2f8b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ComplexToJsonPrelVisitor.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical.visitor;
+
+import java.util.Collections;
+
+import org.apache.drill.exec.planner.physical.ComplexToJsonPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.eigenbase.rel.RelNode;
+
+public class ComplexToJsonPrelVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> {
+
+ private static final ComplexToJsonPrelVisitor INSTANCE = new ComplexToJsonPrelVisitor();
+
+ public static Prel addComplexToJsonPrel(Prel prel) {
+ return prel.accept(INSTANCE, null);
+ }
+
+ @Override
+ public Prel visitScreen(ScreenPrel prel, Void value) throws RuntimeException {
+ return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)new ComplexToJsonPrel((Prel)prel.getChild())));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java
deleted file mode 100644
index 5892782..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.physical.visitor;
-
-import java.util.Collections;
-
-import org.apache.drill.exec.planner.physical.FlattenPrel;
-import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.ScreenPrel;
-import org.eigenbase.rel.RelNode;
-
-public class FlattenPrelVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> {
-
- private static final FlattenPrelVisitor INSTANCE = new FlattenPrelVisitor();
-
- public static Prel addFlattenPrel(Prel prel) {
- return prel.accept(INSTANCE, null);
- }
-
- @Override
- public Prel visitScreen(ScreenPrel prel, Void value) throws RuntimeException {
- return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)new FlattenPrel((Prel)prel.getChild())));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 78dadbf..21420df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -44,7 +44,7 @@ import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.explain.PrelSequencer;
import org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier;
import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer;
-import org.apache.drill.exec.planner.physical.visitor.FlattenPrelVisitor;
+import org.apache.drill.exec.planner.physical.visitor.ComplexToJsonPrelVisitor;
import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor;
import org.apache.drill.exec.planner.physical.visitor.RelUniqifier;
import org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor;
@@ -182,8 +182,8 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
* insert a project which which would convert
*/
if (!context.getSession().isSupportComplexTypes()) {
- logger.debug("Client does not support complex types, add Flatten operator.");
- phyRelNode = FlattenPrelVisitor.addFlattenPrel(phyRelNode);
+ logger.debug("Client does not support complex types, add ComplexToJson operator.");
+ phyRelNode = ComplexToJsonPrelVisitor.addComplexToJsonPrel(phyRelNode);
}
/* 6.)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 2a3266a..562fa90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -90,7 +90,9 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator);
public boolean isActive(){
- return connection.getChannel().isActive() ;
+ return connection != null
+ && connection.getChannel() != null
+ && connection.getChannel().isActive() ;
}
protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
index 9b5eb1d..3d8f02b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.rpc;
import java.util.concurrent.ExecutionException;
-import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.drill.common.exceptions.DrillIOException;
+import org.apache.drill.common.util.DrillStringUtils;
/**
* Parent class for all rpc exceptions.
@@ -38,7 +38,7 @@ public class RpcException extends DrillIOException{
}
private static String format(String message) {
- return StringEscapeUtils.unescapeJava(message);
+ return DrillStringUtils.unescapeJava(message);
}
public RpcException(String message) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 277bb0c..ad885f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -44,6 +44,8 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
private final QueryResultHandler queryResultHandler = new QueryResultHandler();
+ private boolean supportComplexTypes = true;
+
public UserClient(BufferAllocator alloc, EventLoopGroup eventLoopGroup) {
super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
}
@@ -57,7 +59,7 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
.setRpcVersion(UserRpcConfig.RPC_VERSION)
.setSupportListening(true)
- .setSupportComplexTypes(true);
+ .setSupportComplexTypes(supportComplexTypes);
if (props != null) {
hsBuilder.setProperties(props);
@@ -104,10 +106,24 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
@Override
protected void finalizeConnection(BitToUserHandshake handshake, BasicClientWithConnection.ServerConnection connection) {
}
-
+
@Override
public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
}
+ /**
+ * Sets whether the application is willing to accept complex types (Map, Arrays) in the returned result set.
+ * Default is {@code true}. If set to {@code false}, the complex types are returned as JSON encoded VARCHAR type.
+ *
+ * @throws IllegalStateException if called after a connection has been established.
+ */
+ public UserClient setSupportComplexTypes(boolean supportComplexTypes) {
+ if (isActive()) {
+ throw new IllegalStateException("Attempted to modify connection property after connection has been established.");
+ }
+ this.supportComplexTypes = supportComplexTypes;
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index e96ba6c..aaf3c2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -111,8 +111,12 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
void setUser(UserToBitHandshake inbound) throws IOException {
- session = new UserSession(worker.getSystemOptions(), inbound.getCredentials(), inbound.getProperties());
- session.setSupportComplexTypes(inbound.getSupportComplexTypes());
+ session = UserSession.Builder.newBuilder()
+ .withCredentials(inbound.getCredentials())
+ .withOptionManager(worker.getSystemOptions())
+ .withUserProperties(inbound.getProperties())
+ .setSupportComplexTypes(inbound.getSupportComplexTypes())
+ .build();
}
public UserSession getSession(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index 18e365e..13414da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.rpc.user;
-import java.io.IOException;
import java.util.Map;
import net.hydromatic.optiq.SchemaPlus;
@@ -42,18 +41,56 @@ public class UserSession {
private Map<String, String> properties;
private OptionManager options;
- public UserSession(OptionManager systemOptions, UserCredentials credentials, UserProperties properties) throws IOException{
- this.credentials = credentials;
- this.options = new SessionOptionManager(systemOptions);
- this.properties = Maps.newHashMap();
+ public static class Builder {
+ UserSession userSession;
- if (properties == null) return;
- for (int i=0; i<properties.getPropertiesCount(); i++) {
- Property prop = properties.getProperties(i);
- this.properties.put(prop.getKey(), prop.getValue());
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public Builder withCredentials(UserCredentials credentials) {
+ userSession.credentials = credentials;
+ return this;
+ }
+
+ public Builder withOptionManager(OptionManager systemOptions) {
+ userSession.options = new SessionOptionManager(systemOptions);
+ return this;
+ }
+
+ public Builder withUserProperties(UserProperties properties) {
+ userSession.properties = Maps.newHashMap();
+ if (properties != null) {
+ for (int i = 0; i < properties.getPropertiesCount(); i++) {
+ Property prop = properties.getProperties(i);
+ userSession.properties.put(prop.getKey(), prop.getValue());
+ }
+ }
+ return this;
+ }
+
+ public Builder setSupportComplexTypes(boolean supportComplexTypes) {
+ userSession.supportComplexTypes = supportComplexTypes;
+ return this;
+ }
+
+ public UserSession build() {
+ UserSession session = userSession;
+ userSession = null;
+ return session;
+ }
+
+ Builder() {
+ userSession = new UserSession();
}
}
+ private UserSession() { }
+
+ public boolean isSupportComplexTypes() {
+ return supportComplexTypes;
+ }
+
public OptionManager getOptions(){
return options;
}
@@ -62,7 +99,6 @@ public class UserSession {
return user;
}
-
/**
* Update the schema path for the session.
* @param fullPath The desired path to set to.
@@ -107,13 +143,4 @@ public class UserSession {
return schema;
}
- public boolean isSupportComplexTypes() {
- return supportComplexTypes;
- }
-
- public UserSession setSupportComplexTypes(boolean supportComplexType) {
- this.supportComplexTypes = supportComplexType;
- return this;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/util/ConvertUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ConvertUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ConvertUtil.java
index 750885c..ffcb7d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ConvertUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ConvertUtil.java
@@ -23,6 +23,7 @@ import io.netty.buffer.Unpooled;
import java.io.DataInput;
+import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.exec.expr.fn.impl.StringFunctionUtil;
public class ConvertUtil {
@@ -45,7 +46,7 @@ public class ConvertUtil {
int actualLen = (end - start);
if (actualLen != requiredLen) {
throw new IllegalArgumentException(String.format("Wrong length %d(%d-%d) in the buffer '%s', expected %d.",
- actualLen, end, start, StringFunctionUtil.toBinaryString(buffer, start, end), requiredLen));
+ actualLen, end, start, DrillStringUtils.toBinaryString(buffer, start, end), requiredLen));
}
}
@@ -90,7 +91,7 @@ public class ConvertUtil {
int availableBytes = (end-start);
if (availableBytes < getVIntSize(i)) {
throw new NumberFormatException("Expected " + getVIntSize(i) + " bytes but the buffer '"
- + StringFunctionUtil.toBinaryString(buffer, start, end) + "' has only "
+ + DrillStringUtils.toBinaryString(buffer, start, end) + "' has only "
+ availableBytes + " bytes.");
}
buffer.writerIndex(start);
@@ -150,7 +151,7 @@ public class ConvertUtil {
return firstByte;
} else if (availableBytes < len) {
throw new NumberFormatException("Expected " + len + " bytes but the buffer '"
- + StringFunctionUtil.toBinaryString(buffer, start, end) + "' has "
+ + DrillStringUtils.toBinaryString(buffer, start, end) + "' has "
+ availableBytes + " bytes.");
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
index 38cd530..68f4550 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.util;
import java.util.List;
import org.apache.commons.lang.StringUtils;
+import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
@@ -84,7 +86,8 @@ public class VectorUtil {
int columnWidth = getColumnWidth(columnWidths, columnIndex);
width += columnWidth + 2;
formats.add("| %-" + columnWidth + "s");
- columns.add(vw.getValueVector().getField().getPath().getAsUnescapedPath());
+ MaterializedField field = vw.getValueVector().getField();
+ columns.add(field.getPath().getAsUnescapedPath() + "<" + field.getType().getMinorType() + ">");
columnIndex++;
}
@@ -107,19 +110,13 @@ public class VectorUtil {
for (VectorWrapper<?> vw : va) {
int columnWidth = getColumnWidth(columnWidths, columnIndex);
Object o = vw.getValueVector().getAccessor().getObject(row);
- if (o == null) {
- //null value
- System.out.printf(formats.get(columnIndex), "");
- }
- else if (o instanceof byte[]) {
- String value = new String((byte[]) o);
- System.out.printf(formats.get(columnIndex), value.length() <= columnWidth ? value : value.substring(0, columnWidth - 1));
- } else if (o instanceof List) {
- System.out.printf("| %s", o);
+ String cellString;
+ if (o instanceof byte[]) {
+ cellString = DrillStringUtils.toBinaryString((byte[]) o);
} else {
- String value = o.toString();
- System.out.printf(formats.get(columnIndex), value.length() <= columnWidth ? value : value.substring(0,columnWidth - 1));
+ cellString = DrillStringUtils.escapeNewLines(String.valueOf(o));
}
+ System.out.printf(formats.get(columnIndex), cellString.length() <= columnWidth ? cellString : cellString.substring(0, columnWidth - 1));
columnIndex++;
}
System.out.printf("|\n");
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index a819453..741323b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -100,8 +100,7 @@ public class PlanningBase extends ExecTest{
registry.init();
final FunctionImplementationRegistry functionRegistry = new FunctionImplementationRegistry(config);
final SchemaPlus root = Frameworks.createRootSchema(false);
- registry.getSchemaFactory().registerSchemas(new UserSession(null, null, null).setSupportComplexTypes(true), root);
-
+ registry.getSchemaFactory().registerSchemas(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), root);
new NonStrictExpectations() {
@@ -113,7 +112,7 @@ public class PlanningBase extends ExecTest{
context.getFunctionRegistry();
result = functionRegistry;
context.getSession();
- result = new UserSession(null, null, null).setSupportComplexTypes(true);
+ result = UserSession.Builder.newBuilder().setSupportComplexTypes(true).build();
context.getCurrentEndpoint();
result = DrillbitEndpoint.getDefaultInstance();
context.getActiveEndpoints();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index f6200f0..a686fa9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -110,7 +110,7 @@ public class TestOptiqPlans extends ExecTest {
};
RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalPStoreProvider(DrillConfig.create()));
- QueryContext qc = new QueryContext(new UserSession(null, null, null).setSupportComplexTypes(true), QueryId.getDefaultInstance(), bitContext);
+ QueryContext qc = new QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), QueryId.getDefaultInstance(), bitContext);
PhysicalPlanReader reader = bitContext.getPlanReader();
LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc/pom.xml b/exec/jdbc/pom.xml
index afaaa6d..f19294f 100644
--- a/exec/jdbc/pom.xml
+++ b/exec/jdbc/pom.xml
@@ -98,6 +98,7 @@
<inherited>true</inherited>
<configuration>
<excludes>
+ <exclude>**/.checkstyle</exclude>
<exclude>**/.buildpath</exclude>
<exclude>**/*.json</exclude>
<exclude>**/git.properties</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index faeba6f..2f4a58c 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -455,9 +455,9 @@ public final class UserBitShared {
*/
INFO_SCHEMA_SUB_SCAN(30, 30),
/**
- * <code>FLATTEN = 31;</code>
+ * <code>COMPLEX_TO_JSON = 31;</code>
*/
- FLATTEN(31, 31),
+ COMPLEX_TO_JSON(31, 31),
;
/**
@@ -585,9 +585,9 @@ public final class UserBitShared {
*/
public static final int INFO_SCHEMA_SUB_SCAN_VALUE = 30;
/**
- * <code>FLATTEN = 31;</code>
+ * <code>COMPLEX_TO_JSON = 31;</code>
*/
- public static final int FLATTEN_VALUE = 31;
+ public static final int COMPLEX_TO_JSON_VALUE = 31;
public final int getNumber() { return value; }
@@ -625,7 +625,7 @@ public final class UserBitShared {
case 28: return TEXT_SUB_SCAN;
case 29: return JSON_SUB_SCAN;
case 30: return INFO_SCHEMA_SUB_SCAN;
- case 31: return FLATTEN;
+ case 31: return COMPLEX_TO_JSON;
default: return null;
}
}
@@ -16530,7 +16530,7 @@ public final class UserBitShared {
"\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003*k\n\rFragmen" +
"tState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATI",
"ON\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCE" +
- "LLED\020\004\022\n\n\006FAILED\020\005*\362\004\n\020CoreOperatorType\022" +
+ "LLED\020\004\022\n\n\006FAILED\020\005*\372\004\n\020CoreOperatorType\022" +
"\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001" +
"\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH" +
"_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITIO" +
@@ -16545,9 +16545,9 @@ public final class UserBitShared {
"_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030\022\022\n\016PARQ" +
"UET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013TEX" +
"T_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON_SU" +
- "B_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\013\n\007FL" +
- "ATTEN\020\037B.\n\033org.apache.drill.exec.protoB\r" +
- "UserBitSharedH\001"
+ "B_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\023\n\017CO" +
+ "MPLEX_TO_JSON\020\037B.\n\033org.apache.drill.exec" +
+ ".protoB\rUserBitSharedH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index abd7b78..0485a95 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -53,7 +53,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
TEXT_SUB_SCAN(28),
JSON_SUB_SCAN(29),
INFO_SCHEMA_SUB_SCAN(30),
- FLATTEN(31);
+ COMPLEX_TO_JSON(31);
public final int number;
@@ -102,7 +102,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
case 28: return TEXT_SUB_SCAN;
case 29: return JSON_SUB_SCAN;
case 30: return INFO_SCHEMA_SUB_SCAN;
- case 31: return FLATTEN;
+ case 31: return COMPLEX_TO_JSON;
default: return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index eb56efb..fc6f1b5 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -198,5 +198,5 @@ enum CoreOperatorType {
TEXT_SUB_SCAN = 28;
JSON_SUB_SCAN = 29;
INFO_SCHEMA_SUB_SCAN = 30;
- FLATTEN = 31;
+ COMPLEX_TO_JSON = 31;
}