You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/18 06:22:12 UTC

[02/13] git commit: DRILL-836: [addendum] Drill needs to return complex types (e.g., map and array) as a JSON string

DRILL-836: [addendum] Drill needs to return complex types (e.g., map and array) as a JSON string

* This contains additional changes to the original patch which was merged.
+ Renamed "flatten" to "complex-to-json"
+ With the new patch, we return VARCHAR instead of VARBINARY.
+ Added test case.
+ Minor code re-factoring.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2e07b0b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2e07b0b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2e07b0b8

Branch: refs/heads/master
Commit: 2e07b0b8ab4fe7fdb1ad51ee47ab10aeec94b7f9
Parents: 4d17aea
Author: Aditya Kishore <ad...@maprtech.com>
Authored: Mon Jun 16 15:01:11 2014 -0700
Committer: Aditya Kishore <ad...@maprtech.com>
Committed: Tue Jun 17 00:06:41 2014 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../drill/common/util/DrillStringUtils.java     | 176 +++++++++++++++++++
 .../org/apache/drill/hbase/BaseHBaseTest.java   |  28 ++-
 .../drill/hbase/HBaseRecordReaderTest.java      |   6 +-
 .../org/apache/drill/hbase/HBaseTestsSuite.java |   6 +-
 .../drill/hbase/TestHBaseCFAsJSONString.java    |  54 ++++++
 .../drill/hbase/TestHBaseFilterPushDown.java    |  12 +-
 .../drill/hbase/TestHBaseProjectPushDown.java   |   8 +-
 .../resources/bootstrap-storage-plugins.json    |   1 +
 .../src/main/codegen/includes/vv_imports.ftl    |   1 +
 .../main/codegen/templates/SqlAccessors.java    |   2 +-
 .../apache/drill/exec/client/DrillClient.java   |  16 +-
 .../exec/expr/fn/impl/StringFunctionUtil.java   |  71 --------
 .../exec/expr/fn/impl/StringFunctions.java      |   2 +-
 .../exec/physical/config/ComplexToJson.java     |  59 +++++++
 .../drill/exec/physical/config/Flatten.java     |  59 -------
 .../impl/project/ComplexToJsonBatchCreator.java |  42 +++++
 .../impl/project/FlattenBatchCreator.java       |  42 -----
 .../impl/project/ProjectBatchCreator.java       |   3 +-
 .../impl/project/ProjectRecordBatch.java        |  62 ++++---
 .../planner/fragment/SimpleParallelizer.java    |   6 +-
 .../planner/physical/ComplexToJsonPrel.java     |  69 ++++++++
 .../exec/planner/physical/FlattenPrel.java      |  61 -------
 .../visitor/ComplexToJsonPrelVisitor.java       |  40 +++++
 .../physical/visitor/FlattenPrelVisitor.java    |  40 -----
 .../planner/sql/handlers/DefaultSqlHandler.java |   6 +-
 .../org/apache/drill/exec/rpc/BasicClient.java  |   4 +-
 .../org/apache/drill/exec/rpc/RpcException.java |   4 +-
 .../apache/drill/exec/rpc/user/UserClient.java  |  20 ++-
 .../apache/drill/exec/rpc/user/UserServer.java  |   8 +-
 .../apache/drill/exec/rpc/user/UserSession.java |  65 +++++--
 .../org/apache/drill/exec/util/ConvertUtil.java |   7 +-
 .../org/apache/drill/exec/util/VectorUtil.java  |  21 +--
 .../java/org/apache/drill/PlanningBase.java     |   5 +-
 .../exec/physical/impl/TestOptiqPlans.java      |   2 +-
 exec/jdbc/pom.xml                               |   1 +
 .../apache/drill/exec/proto/UserBitShared.java  |  18 +-
 .../exec/proto/beans/CoreOperatorType.java      |   4 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   2 +-
 39 files changed, 645 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 1092a7d..838ea6b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,7 @@
 .project
 .buildpath
 .classpath
+.checkstyle
 .settings/
 .idea/
 *.log

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java b/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java
new file mode 100644
index 0000000..96b5776
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.util;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+
+import io.netty.buffer.ByteBuf;
+
+public class DrillStringUtils {
+  /**
+   * Unescapes any Java literals found in the {@code String}.
+   * For example, it will turn a sequence of {@code '\'} and
+   * {@code 'n'} into a newline character, unless the {@code '\'}
+   * is preceded by another {@code '\'}.
+   *
+   * @param input  the {@code String} to unescape, may be null
+   * @return a new unescaped {@code String}, {@code null} if null string input
+   */
+  public static final String unescapeJava(String input) {
+    return StringEscapeUtils.unescapeJava(input);
+  }
+
+  /**
+   * Escapes the characters in a {@code String} using Java String rules.
+   *
+   * Deals correctly with quotes and control-chars (tab, backslash, cr, ff, etc.)
+   *
+   * So a tab becomes the characters {@code '\\'} and
+   * {@code 't'}.
+   *
+   * Example:
+   * <pre>
+   * input string: He didn't say, "Stop!"
+   * output string: He didn't say, \"Stop!\"
+   * </pre>
+   *
+   * @param input  String to escape values in, may be null
+   * @return String with escaped values, {@code null} if null string input
+   */
+  public static final String escapeJava(String input) {
+    return StringEscapeUtils.escapeJava(input);
+  }
+
+  public static final String escapeNewLines(String input) {
+    if (input == null) {
+      return null;
+    }
+    StringBuilder result = new StringBuilder();
+    boolean sawNewline = false;
+    for (int i = 0; i < input.length(); i++) {
+      char curChar = input.charAt(i);
+      if (curChar == '\r' || curChar == '\n') {
+        if (sawNewline) {
+          continue;
+        }
+        sawNewline = true;
+        result.append("\\n");
+      } else {
+        sawNewline = false;
+        result.append(curChar);
+      }
+    }
+    return result.toString();
+  }
+
+  /**
+   * Return a printable representation of a byte buffer, escaping the non-printable
+   * bytes as '\\xNN' where NN is the hexadecimal representation of such bytes.
+   *
+   * This function does not modify  the {@code readerIndex} and {@code writerIndex}
+   * of the byte buffer.
+   */
+  public static String toBinaryString(ByteBuf buf, int strStart, int strEnd) {
+    StringBuilder result = new StringBuilder();
+    for (int i = strStart; i < strEnd ; ++i) {
+      appendByte(result, buf.getByte(i));
+    }
+    return result.toString();
+  }
+
+  /**
+   * Return a printable representation of a byte array, escaping the non-printable
+   * bytes as '\\xNN' where NN is the hexadecimal representation of such bytes.
+   */
+  public static String toBinaryString(byte[] buf) {
+    return toBinaryString(buf, 0, buf.length);
+  }
+
+  /**
+   * Return a printable representation of a byte array, escaping the non-printable
+   * bytes as '\\xNN' where NN is the hexadecimal representation of such bytes.
+   */
+  public static String toBinaryString(byte[] buf, int strStart, int strEnd) {
+    StringBuilder result = new StringBuilder();
+    for (int i = strStart; i < strEnd ; ++i) {
+      appendByte(result, buf[i]);
+    }
+    return result.toString();
+  }
+
+  private static void appendByte(StringBuilder result, byte b) {
+    int ch = b & 0xFF;
+    if (   (ch >= '0' && ch <= '9')
+        || (ch >= 'A' && ch <= 'Z')
+        || (ch >= 'a' && ch <= 'z')
+        || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0 ) {
+        result.append((char)ch);
+    } else {
+      result.append(String.format("\\x%02X", ch));
+    }
+  }
+
+  /**
+   * In-place parsing of a hex encoded binary string.
+   *
+   * This function does not modify  the {@code readerIndex} and {@code writerIndex}
+   * of the byte buffer.
+   *
+   * @return Index in the byte buffer just after the last written byte.
+   */
+  public static int parseBinaryString(ByteBuf str, int strStart, int strEnd) {
+    int length = (strEnd - strStart);
+    int dstEnd = strStart;
+    for (int i = strStart; i < length ; i++) {
+      byte b = str.getByte(i);
+      if (b == '\\'
+          && length > i+3
+          && (str.getByte(i+1) == 'x' || str.getByte(i+1) == 'X')) {
+        // ok, take next 2 hex digits.
+        byte hd1 = str.getByte(i+2);
+        byte hd2 = str.getByte(i+3);
+        if (isHexDigit(hd1) && isHexDigit(hd2)) { // [a-fA-F0-9]
+          // turn hex ASCII digit -> number
+          b = (byte) ((toBinaryFromHex(hd1) << 4) + toBinaryFromHex(hd2));
+          i += 3; // skip 3
+        }
+      }
+      str.setByte(dstEnd++, b);
+    }
+    return dstEnd;
+  }
+
+  /**
+   * Takes a ASCII digit in the range A-F0-9 and returns
+   * the corresponding integer/ordinal value.
+   * @param ch  The hex digit.
+   * @return The converted hex value as a byte.
+   */
+  private static byte toBinaryFromHex(byte ch) {
+    if ( ch >= 'A' && ch <= 'F' )
+      return (byte) ((byte)10 + (byte) (ch - 'A'));
+    else if ( ch >= 'a' && ch <= 'f' )
+      return (byte) ((byte)10 + (byte) (ch - 'a'));
+    return (byte) (ch - '0');
+  }
+
+  private static boolean isHexDigit(byte c) {
+    return (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F') || (c >= '0' && c <= '9');
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index dbeced3..e6a5474 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -42,6 +42,8 @@ import com.google.common.io.Files;
 
 public class BaseHBaseTest extends BaseTestQuery {
 
+  private static final String HBASE_STORAGE_PLUGIN_NAME = "hbase";
+
   protected static Configuration conf = HBaseConfiguration.create();
 
   protected static HBaseStoragePlugin storagePlugin;
@@ -66,10 +68,11 @@ public class BaseHBaseTest extends BaseTestQuery {
     HBaseTestsSuite.configure(true, true);
     HBaseTestsSuite.initCluster();
 
-    storagePlugin = (HBaseStoragePlugin) bit.getContext().getStorage().getPlugin("hbase");
+    storagePlugin = (HBaseStoragePlugin) bit.getContext().getStorage().getPlugin(HBASE_STORAGE_PLUGIN_NAME);
     storagePluginConfig = storagePlugin.getConfig();
-
+    storagePluginConfig.setEnabled(true);
     storagePluginConfig.setZookeeperPort(HBaseTestsSuite.getZookeeperPort());
+    bit.getContext().getStorage().createOrUpdate(HBASE_STORAGE_PLUGIN_NAME, storagePluginConfig, true);
   }
 
   @AfterClass
@@ -91,20 +94,24 @@ public class BaseHBaseTest extends BaseTestQuery {
         .replace("[TABLE_NAME]", tableName);
   }
 
-  protected void runPhysicalVerifyCount(String planFile, String tableName, int expectedRowCount) throws Exception{
+  protected void runHBasePhysicalVerifyCount(String planFile, String tableName, int expectedRowCount) throws Exception{
     String physicalPlan = getPlanText(planFile, tableName);
     List<QueryResultBatch> results = testPhysicalWithResults(physicalPlan);
     printResultAndVerifyRowCount(results, expectedRowCount);
   }
 
-  protected void runSQLVerifyCount(String sql, int expectedRowCount) throws Exception{
-    sql = canonizeSQL(sql);
+  protected List<QueryResultBatch> runHBaseSQLlWithResults(String sql) throws Exception {
+    sql = canonizeHBaseSQL(sql);
     System.out.println("Running query:\n" + sql);
-    List<QueryResultBatch> results = testSqlWithResults(sql);
+    return testSqlWithResults(sql);
+  }
+
+  protected void runHBaseSQLVerifyCount(String sql, int expectedRowCount) throws Exception{
+    List<QueryResultBatch> results = runHBaseSQLlWithResults(sql);
     printResultAndVerifyRowCount(results, expectedRowCount);
   }
 
-  private void printResultAndVerifyRowCount(List<QueryResultBatch> results, int expectedRowCount) throws SchemaChangeException {
+  protected int printResult(List<QueryResultBatch> results) throws SchemaChangeException {
     int rowCount = 0;
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
     for(QueryResultBatch result : results){
@@ -118,12 +125,17 @@ public class BaseHBaseTest extends BaseTestQuery {
       result.release();
     }
     System.out.println("Total record count: " + rowCount);
+    return rowCount;
+  }
+
+  private void printResultAndVerifyRowCount(List<QueryResultBatch> results, int expectedRowCount) throws SchemaChangeException {
+    int rowCount = printResult(results);
     if (expectedRowCount != -1) {
       Assert.assertEquals(expectedRowCount, rowCount);
     }
   }
 
-  protected String canonizeSQL(String sql) {
+  protected String canonizeHBaseSQL(String sql) {
     return sql.replace("[TABLE_NAME]", HBaseTestsSuite.TEST_TABLE_1);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
index 1462b81..60db266 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
@@ -24,19 +24,19 @@ public class HBaseRecordReaderTest extends BaseHBaseTest {
   @Test
   public void testLocalDistributed() throws Exception {
     String planName = "/hbase/hbase_scan_screen_physical.json";
-    runPhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 6);
+    runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 6);
   }
 
   @Test
   public void testLocalDistributedColumnSelect() throws Exception {
     String planName = "/hbase/hbase_scan_screen_physical_column_select.json";
-    runPhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 2);
+    runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 2);
   }
 
   @Test
   public void testLocalDistributedFamilySelect() throws Exception {
     String planName = "/hbase/hbase_scan_screen_physical_family_select.json";
-    runPhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 3);
+    runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 3);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index a24215d..18cf87c 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -38,7 +38,8 @@ import org.junit.runners.Suite.SuiteClasses;
   TestHBaseFilterPushDown.class,
   TestHBaseProjectPushDown.class,
   TestHBaseRegionScanAssignments.class,
-  TestHBaseTableProvider.class
+  TestHBaseTableProvider.class,
+  TestHBaseCFAsJSONString.class
 })
 public class HBaseTestsSuite {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseTestsSuite.class);
@@ -55,6 +56,9 @@ public class HBaseTestsSuite {
 
   private static volatile AtomicInteger initCount = new AtomicInteger(0);
 
+  /**
+   * This flag controls whether {@link HBaseTestsSuite} starts a mini HBase cluster to run the unit test.
+   */
   private static boolean manageHBaseCluster = System.getProperty("drill.hbase.tests.manageHBaseCluster", "true").equalsIgnoreCase("true");
   private static boolean hbaseClusterCreated = false;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
new file mode 100644
index 0000000..9cc0356
--- /dev/null
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.hbase;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHBaseCFAsJSONString extends BaseHBaseTest {
+
+  private static DrillClient parent_client;
+
+  @BeforeClass
+  public static void openMyClient() throws Exception {
+    parent_client = client;
+    client = new DrillClient(config, serviceSet.getCoordinator());
+    client.setSupportComplexTypes(false);
+    client.connect();
+  }
+
+  @AfterClass
+  public static void closeClient() throws IOException {
+    if(client != null) client.close();
+    client = parent_client;
+  }
+
+  @Test
+  public void testColumnFamiliesAsJSONString() throws Exception {
+    setColumnWidths(new int[] {112, 12});
+    List<QueryResultBatch> resultList = runHBaseSQLlWithResults("SELECT f, f2 FROM hbase.`[TABLE_NAME]` tableName LIMIT 1");
+    printResult(resultList);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 50dddeb..29e7033 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -24,7 +24,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
   @Test
   public void testFilterPushDownRowKeyEqual() throws Exception {
     setColumnWidths(new int[] {8, 38, 38});
-    runSQLVerifyCount("SELECT\n"
+    runHBaseSQLVerifyCount("SELECT\n"
         + "  *\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName\n"
@@ -36,7 +36,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
   @Test
   public void testFilterPushDownRowKeyGreaterThan() throws Exception {
     setColumnWidths(new int[] {8, 38, 38});
-    runSQLVerifyCount("SELECT\n"
+    runHBaseSQLVerifyCount("SELECT\n"
         + "  *\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName\n"
@@ -48,7 +48,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
   @Test
   public void testFilterPushDownRowKeyBetween() throws Exception {
     setColumnWidths(new int[] {8, 74, 38});
-    runSQLVerifyCount("SELECT\n"
+    runHBaseSQLVerifyCount("SELECT\n"
         + "  *\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName\n"
@@ -60,7 +60,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
   @Test
   public void testFilterPushDownMultiColumns() throws Exception {
     setColumnWidths(new int[] {8, 74, 38});
-    runSQLVerifyCount("SELECT\n"
+    runHBaseSQLVerifyCount("SELECT\n"
         + "  *\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` t\n"
@@ -72,7 +72,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
   @Test
   public void testFilterPushDownConvertExpression() throws Exception {
     setColumnWidths(new int[] {8, 38, 38});
-    runSQLVerifyCount("SELECT\n"
+    runHBaseSQLVerifyCount("SELECT\n"
         + "  *\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName\n"
@@ -84,7 +84,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
   @Test
   public void testFilterPushDownRowKeyLessThanOrEqualTo() throws Exception {
     setColumnWidths(new int[] {8, 74, 38});
-    runSQLVerifyCount("SELECT\n"
+    runHBaseSQLVerifyCount("SELECT\n"
         + "  *\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName\n"

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
index ce6f865..6efe905 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
@@ -24,7 +24,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
   @Test
   public void testRowKeyPushDown() throws Exception{
     setColumnWidth(8);
-    runSQLVerifyCount("SELECT\n"
+    runHBaseSQLVerifyCount("SELECT\n"
         + "row_key\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName"
@@ -34,7 +34,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
   @Test
   public void testColumnWith1RowPushDown() throws Exception{
     setColumnWidth(6);
-    runSQLVerifyCount("SELECT\n"
+    runHBaseSQLVerifyCount("SELECT\n"
         + "t.f2.c7 as `t.f2.c7`\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` t"
@@ -44,7 +44,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
   @Test
   public void testRowKeyAndColumnPushDown() throws Exception{
     setColumnWidths(new int[] {8, 9, 6, 2, 6});
-    runSQLVerifyCount("SELECT\n"
+    runHBaseSQLVerifyCount("SELECT\n"
         + "row_key, t.f.c1*31 as `t.f.c1*31`, t.f.c2 as `t.f.c2`, 5 as `5`, 'abc' as `'abc'`\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` t"
@@ -54,7 +54,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
   @Test
   public void testColumnFamilyPushDown() throws Exception{
     setColumnWidths(new int[] {8, 74, 38});
-    runSQLVerifyCount("SELECT\n"
+    runHBaseSQLVerifyCount("SELECT\n"
         + "row_key, f, f2\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName"

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
index 0e93f7e..3e0e8c0 100644
--- a/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
@@ -2,6 +2,7 @@
   "storage":{
     hbase : {
       type:"hbase",
+      enabled: false,
       config : {
         "hbase.zookeeper.quorum" : "localhost",
         "hbase.zookeeper.property.clientPort" : 2181

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
index 9e16116..872c0b8 100644
--- a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
+++ b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
@@ -33,6 +33,7 @@ import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.types.TypeProtos.*;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.common.util.DrillStringUtils;
 import org.apache.drill.exec.vector.complex.*;
 import org.apache.drill.exec.vector.complex.reader.*;
 import org.apache.drill.exec.vector.complex.impl.*;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/SqlAccessors.java b/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
index d05b7fd..a5b251e 100644
--- a/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
+++ b/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
@@ -69,7 +69,7 @@ public class ${name}Accessor extends AbstractSqlAccessor{
     <#case "VarBinary">
     public String getString(int index) {
       byte [] b = ac.get(index);
-      return StringFunctionUtil.toBinaryString(io.netty.buffer.Unpooled.wrappedBuffer(b), 0, b.length);
+      return DrillStringUtils.toBinaryString(io.netty.buffer.Unpooled.wrappedBuffer(b), 0, b.length);
     }
       <#break>
     <#case "VarChar">

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 7dc7702..7d7f2ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -72,6 +72,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
   private final BufferAllocator allocator;
   private int reconnectTimes;
   private int reconnectDelay;
+  private boolean supportComplexTypes = true;
   private final boolean ownsZkConnection;
   private final boolean ownsAllocator;
 
@@ -110,7 +111,18 @@ public class DrillClient implements Closeable, ConnectionThrottle{
     client.setAutoRead(enableAutoRead);
   }
 
-
+  /**
+   * Sets whether the application is willing to accept complex types (Map, Arrays) in the returned result set.
+   * Default is {@code true}. If set to {@code false}, the complex types are returned as JSON encoded VARCHAR type.
+   *
+   * @throws IllegalStateException if called after a connection has been established.
+   */
+  public void setSupportComplexTypes(boolean supportComplexTypes) {
+    if (connected) {
+      throw new IllegalStateException("Attempted to modify client connection property after connection has been established.");
+    }
+    this.supportComplexTypes = supportComplexTypes;
+  }
 
   /**
    * Connects the client to a Drillbit server
@@ -181,7 +193,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
   private void connect(DrillbitEndpoint endpoint) throws RpcException {
     FutureHandler f = new FutureHandler();
     try {
-      client.connect(f, endpoint, props);
+      client.setSupportComplexTypes(supportComplexTypes).connect(f, endpoint, props);
       f.checkedGet();
     } catch (InterruptedException e) {
       throw new RpcException(e);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
index fbdab8e..844a3e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionUtil.java
@@ -72,77 +72,6 @@ public class StringFunctionUtil {
     return -1;
   }
 
-  /**
-   * Return a printable representation of a byte buffer, escaping the non-printable
-   * bytes as '\\xNN' where NN is the hexadecimal representation of such bytes.
-   *
-   * This function does not modify  the {@code readerIndex} and {@code writerIndex}
-   * of the byte buffer.
-   */
-  public static String toBinaryString(ByteBuf buf, int strStart, int strEnd) {
-    StringBuilder result = new StringBuilder();
-    for (int i = strStart; i < strEnd ; ++i) {
-      int ch = buf.getByte(i) & 0xFF;
-      if ( (ch >= '0' && ch <= '9')
-          || (ch >= 'A' && ch <= 'Z')
-          || (ch >= 'a' && ch <= 'z')
-          || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0 ) {
-          result.append((char)ch);
-      } else {
-        result.append(String.format("\\x%02X", ch));
-      }
-    }
-    return result.toString();
-  }
-
-  /**
-   * In-place parsing of a hex encoded binary string.
-   *
-   * This function does not modify  the {@code readerIndex} and {@code writerIndex}
-   * of the byte buffer.
-   *
-   * @return Index in the byte buffer just after the last written byte.
-   */
-  public static int parseBinaryString(ByteBuf str, int strStart, int strEnd) {
-    int length = (strEnd - strStart);
-    int dstEnd = strStart;
-    for (int i = strStart; i < length ; i++) {
-      byte b = str.getByte(i);
-      if (b == '\\'
-          && length > i+3
-          && (str.getByte(i+1) == 'x' || str.getByte(i+1) == 'X')) {
-        // ok, take next 2 hex digits.
-        byte hd1 = str.getByte(i+2);
-        byte hd2 = str.getByte(i+3);
-        if (isHexDigit(hd1) && isHexDigit(hd2)) { // [a-fA-F0-9]
-          // turn hex ASCII digit -> number
-          b = (byte) ((toBinaryFromHex(hd1) << 4) + toBinaryFromHex(hd2));
-          i += 3; // skip 3
-        }
-      }
-      str.setByte(dstEnd++, b);
-    }
-    return dstEnd;
-  }
-
-  /**
-   * Takes a ASCII digit in the range A-F0-9 and returns
-   * the corresponding integer/ordinal value.
-   * @param ch  The hex digit.
-   * @return The converted hex value as a byte.
-   */
-  private static byte toBinaryFromHex(byte ch) {
-    if ( ch >= 'A' && ch <= 'F' )
-      return (byte) ((byte)10 + (byte) (ch - 'A'));
-    else if ( ch >= 'a' && ch <= 'f' )
-      return (byte) ((byte)10 + (byte) (ch - 'a'));
-    return (byte) (ch - '0');
-  }
-
-  private static boolean isHexDigit(byte c) {
-    return (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F') || (c >= '0' && c <= '9');
-  }
-
   private static int utf8CharLen(ByteBuf buffer, int idx) {
     byte firstByte = buffer.getByte(idx);
     if (firstByte >= 0) { // 1-byte char. First byte is 0xxxxxxx.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
index 51a7dbb..33f2c94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java
@@ -928,7 +928,7 @@ public class StringFunctions{
     public void eval() {
       out.buffer = in.buffer;
       out.start = in.start;
-      out.end = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.parseBinaryString(in.buffer, in.start, in.end);
+      out.end = org.apache.drill.common.util.DrillStringUtils.parseBinaryString(in.buffer, in.start, in.end);
       out.buffer.readerIndex(out.start);
       out.buffer.writerIndex(out.end);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ComplexToJson.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ComplexToJson.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ComplexToJson.java
new file mode 100644
index 0000000..480f84e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ComplexToJson.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("complex-to-json")
+public class ComplexToJson extends AbstractSingle {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexToJson.class);
+
+  @JsonCreator
+  public ComplexToJson(@JsonProperty("child") PhysicalOperator child) {
+    super(child);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitOp(this, value);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new ComplexToJson(child);
+  }
+
+  @Override
+  public SelectionVectorMode getSVMode() {
+    return child.getSVMode();
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.COMPLEX_TO_JSON_VALUE;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java
deleted file mode 100644
index d123d2b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Flatten.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.config;
-
-import org.apache.drill.exec.physical.base.AbstractSingle;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("flatten")
-public class Flatten extends AbstractSingle {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Flatten.class);
-
-  @JsonCreator
-  public Flatten(@JsonProperty("child") PhysicalOperator child) {
-    super(child);
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
-    return physicalVisitor.visitOp(this, value);
-  }
-
-  @Override
-  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new Flatten(child);
-  }
-
-  @Override
-  public SelectionVectorMode getSVMode() {
-    return child.getSVMode();
-  }
-
-  @Override
-  public int getOperatorType() {
-    return CoreOperatorType.FLATTEN_VALUE;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
new file mode 100644
index 0000000..0df9491
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.project;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.ComplexToJson;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+public class ComplexToJsonBatchCreator implements BatchCreator<ComplexToJson> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexToJsonBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, ComplexToJson flatten, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.size() == 1);
+    return new ProjectRecordBatch(new Project(null, flatten.getChild()),
+                                  children.iterator().next(),
+                                  context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java
deleted file mode 100644
index 9bea73c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/FlattenBatchCreator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.project;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.Flatten;
-import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.record.RecordBatch;
-
-import com.google.common.base.Preconditions;
-
-public class FlattenBatchCreator implements BatchCreator<Flatten> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class);
-
-  @Override
-  public RecordBatch getBatch(FragmentContext context, Flatten flatten, List<RecordBatch> children) throws ExecutionSetupException {
-    Preconditions.checkArgument(children.size() == 1);
-    return new ProjectRecordBatch(new Project(null, flatten.getChild()),
-                                  children.iterator().next(),
-                                  context);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
index 929071d..cb1d4f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
@@ -35,6 +35,5 @@ public class ProjectBatchCreator implements BatchCreator<Project>{
     Preconditions.checkArgument(children.size() == 1);
     return new ProjectRecordBatch(config, children.iterator().next(), context);
   }
-  
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index f5a4444..5ee01f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -25,12 +25,16 @@ import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.FunctionCallFactory;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.fn.CastFunctions;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -98,9 +102,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
   protected void doWork() {
 //    VectorUtil.showVectorAccessibleContent(incoming, ",");
     int incomingRecordCount = incoming.getRecordCount();
-    
+
     doAlloc();
-    
+
     int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
     if (outputRecords < incomingRecordCount) {
       setValueCount(outputRecords);
@@ -114,8 +118,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
       }
       this.recordCount = outputRecords;
     }
-    // In case of complex writer expression, vectors would be added to batch run-time. 
-    // We have to re-build the schema. 
+    // In case of complex writer expression, vectors would be added to batch run-time.
+    // We have to re-build the schema.
     if (complexWriters != null) {
       container.buildSchema(SelectionVectorMode.NONE);
     }
@@ -138,17 +142,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
       }
       this.recordCount = remainingRecordCount;
     }
-    // In case of complex writer expression, vectors would be added to batch run-time. 
-    // We have to re-build the schema. 
+    // In case of complex writer expression, vectors would be added to batch run-time.
+    // We have to re-build the schema.
     if (complexWriters != null) {
       container.buildSchema(SelectionVectorMode.NONE);
-    }   
+    }
   }
 
   public void addComplexWriter(ComplexWriter writer) {
     complexWriters.add(writer);
   }
-  
+
   private boolean doAlloc() {
     //Allocate vv in the allocationVectors.
     for(ValueVector v : this.allocationVectors){
@@ -156,17 +160,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
       if (!v.allocateNewSafe())
         return false;
     }
-    
+
     //Allocate vv for complexWriters.
     if (complexWriters == null)
       return true;
-    
+
     for (ComplexWriter writer : complexWriters)
       writer.allocate();
-    
+
     return true;
   }
-  
+
   private void setValueCount(int count) {
     for(ValueVector v : allocationVectors){
       ValueVector.Mutator m = v.getMutator();
@@ -177,9 +181,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
       return;
 
     for (ComplexWriter writer : complexWriters)
-      writer.setValueCount(count);    
+      writer.setValueCount(count);
   }
-  
+
   /** hack to make ref and full work together... need to figure out if this is still necessary. **/
   private FieldReference getRef(NamedExpression e){
     FieldReference ref = e.getRef();
@@ -259,16 +263,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
           container.add(tp.getTo());
           transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
 //          logger.debug("Added transfer.");
-        } else if (expr instanceof DrillFuncHolderExpr && 
-                  ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder())  {  
-          // Need to process ComplexWriter function evaluation. 
-          // Lazy initialization of the list of complex writers, if not done yet. 
+        } else if (expr instanceof DrillFuncHolderExpr &&
+                  ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder())  {
+          // Need to process ComplexWriter function evaluation.
+          // Lazy initialization of the list of complex writers, if not done yet.
           if (complexWriters == null)
             complexWriters = Lists.newArrayList();
-         
-          // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. 
+
+          // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
           ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef());
-          cg.addExpr(expr);          
+          cg.addExpr(expr);
         } else{
           // need to do evaluation.
           ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -303,10 +307,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
     List<NamedExpression> exprs = Lists.newArrayList();
     for (MaterializedField field : incoming.getSchema()) {
       if (Types.isComplex(field.getType())) {
-        exprs.add(new NamedExpression(
-            FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON", field.getPath(), ExpressionPosition.UNKNOWN),
-            new FieldReference(field.getPath()))
-            );
+        LogicalExpression convertToJson = FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON", field.getPath(), ExpressionPosition.UNKNOWN);
+        String castFuncName = CastFunctions.getCastFunc(MinorType.VARCHAR);
+        List<LogicalExpression> castArgs = Lists.newArrayList();
+        castArgs.add(convertToJson);  //input_expr
+        /*
+         * We are implicitly casting to VARCHAR so we don't have a max length,
+         * using an arbitrary value. We trim down the size of the stored bytes
+         * to the actual size so this size doesn't really matter.
+         */
+        castArgs.add(new ValueExpressions.LongExpression(65536, null)); //
+        FunctionCall castCall = new FunctionCall(castFuncName, castArgs, ExpressionPosition.UNKNOWN);
+        exprs.add(new NamedExpression(castCall, new FieldReference(field.getPath())));
       } else {
         exprs.add(new NamedExpression(field.getPath(), new FieldReference(field.getPath())));
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 053580b..0ce480d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -20,9 +20,9 @@ package org.apache.drill.exec.planner.fragment;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.common.util.DrillStringUtils;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.expr.fn.impl.DateUtility;
@@ -158,11 +158,11 @@ public class SimpleParallelizer {
             .build();
 
         if (isRootNode) {
-          logger.debug("Root fragment:\n {}", StringEscapeUtils.unescapeJava(fragment.toString()));
+          logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
           rootFragment = fragment;
           rootOperator = root;
         } else {
-          logger.debug("Remote fragment:\n {}", StringEscapeUtils.unescapeJava(fragment.toString()));
+          logger.debug("Remote fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
           fragments.add(fragment);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ComplexToJsonPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ComplexToJsonPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ComplexToJsonPrel.java
new file mode 100644
index 0000000..b6bedb6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ComplexToJsonPrel.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.ComplexToJson;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SingleRel;
+import org.eigenbase.relopt.RelTraitSet;
+
+public class ComplexToJsonPrel extends SingleRel implements Prel {
+
+  public ComplexToJsonPrel(Prel phyRelNode) {
+    super(phyRelNode.getCluster(), phyRelNode.getTraitSet(), phyRelNode);
+  }
+
+  @Override 
+  public final RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new ComplexToJsonPrel((Prel) sole(inputs));
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+    ComplexToJson p = new ComplexToJson(((Prel) getChild()).getPhysicalOperator(creator));
+    return creator.addMetadata(this, p);
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java
deleted file mode 100644
index 3c19bae..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FlattenPrel.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.physical;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.Flatten;
-import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.eigenbase.rel.SingleRel;
-
-public class FlattenPrel extends SingleRel implements Prel {
-
-  public FlattenPrel(Prel phyRelNode) {
-    super(phyRelNode.getCluster(), phyRelNode.getTraitSet(), phyRelNode);
-  }
-
-  @Override
-  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
-    Flatten p = new Flatten(((Prel) getChild()).getPhysicalOperator(creator));
-    return creator.addMetadata(this, p);
-  }
-
-  @Override
-  public Iterator<Prel> iterator() {
-    return PrelUtil.iter(getChild());
-  }
-
-  @Override
-  public SelectionVectorMode[] getSupportedEncodings() {
-    return SelectionVectorMode.DEFAULT;
-  }
-
-  @Override
-  public SelectionVectorMode getEncoding() {
-    return SelectionVectorMode.NONE;
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
-    return logicalVisitor.visitPrel(this, value);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ComplexToJsonPrelVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ComplexToJsonPrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ComplexToJsonPrelVisitor.java
new file mode 100644
index 0000000..37b2f8b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ComplexToJsonPrelVisitor.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical.visitor;
+
+import java.util.Collections;
+
+import org.apache.drill.exec.planner.physical.ComplexToJsonPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.eigenbase.rel.RelNode;
+
+public class ComplexToJsonPrelVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> {
+
+  private static final ComplexToJsonPrelVisitor INSTANCE = new ComplexToJsonPrelVisitor();
+
+  public static Prel addComplexToJsonPrel(Prel prel) {
+    return prel.accept(INSTANCE, null);
+  }
+
+  @Override
+  public Prel visitScreen(ScreenPrel prel, Void value) throws RuntimeException {
+    return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)new ComplexToJsonPrel((Prel)prel.getChild())));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java
deleted file mode 100644
index 5892782..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/FlattenPrelVisitor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.physical.visitor;
-
-import java.util.Collections;
-
-import org.apache.drill.exec.planner.physical.FlattenPrel;
-import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.ScreenPrel;
-import org.eigenbase.rel.RelNode;
-
-public class FlattenPrelVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> {
-
-  private static final FlattenPrelVisitor INSTANCE = new FlattenPrelVisitor();
-
-  public static Prel addFlattenPrel(Prel prel) {
-    return prel.accept(INSTANCE, null);
-  }
-
-  @Override
-  public Prel visitScreen(ScreenPrel prel, Void value) throws RuntimeException {
-    return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)new FlattenPrel((Prel)prel.getChild())));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 78dadbf..21420df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -44,7 +44,7 @@ import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.explain.PrelSequencer;
 import org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier;
 import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer;
-import org.apache.drill.exec.planner.physical.visitor.FlattenPrelVisitor;
+import org.apache.drill.exec.planner.physical.visitor.ComplexToJsonPrelVisitor;
 import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor;
 import org.apache.drill.exec.planner.physical.visitor.RelUniqifier;
 import org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor;
@@ -182,8 +182,8 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
      * insert a project which which would convert
      */
     if (!context.getSession().isSupportComplexTypes()) {
-      logger.debug("Client does not support complex types, add Flatten operator.");
-      phyRelNode = FlattenPrelVisitor.addFlattenPrel(phyRelNode);
+      logger.debug("Client does not support complex types, add ComplexToJson operator.");
+      phyRelNode = ComplexToJsonPrelVisitor.addComplexToJsonPrel(phyRelNode);
     }
 
     /* 6.)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 2a3266a..562fa90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -90,7 +90,9 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
   public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator);
 
   public boolean isActive(){
-    return connection.getChannel().isActive() ;
+    return connection != null
+        && connection.getChannel() != null
+        && connection.getChannel().isActive() ;
   }
 
   protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
index 9b5eb1d..3d8f02b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.rpc;
 
 import java.util.concurrent.ExecutionException;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.drill.common.exceptions.DrillIOException;
+import org.apache.drill.common.util.DrillStringUtils;
 
 /**
  * Parent class for all rpc exceptions.
@@ -38,7 +38,7 @@ public class RpcException extends DrillIOException{
   }
 
   private static String format(String message) {
-    return StringEscapeUtils.unescapeJava(message);
+    return DrillStringUtils.unescapeJava(message);
   }
 
   public RpcException(String message) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 277bb0c..ad885f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -44,6 +44,8 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
 
   private final QueryResultHandler queryResultHandler = new QueryResultHandler();
 
+  private boolean supportComplexTypes = true;
+
   public UserClient(BufferAllocator alloc, EventLoopGroup eventLoopGroup) {
     super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
   }
@@ -57,7 +59,7 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
     UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
         .setRpcVersion(UserRpcConfig.RPC_VERSION)
         .setSupportListening(true)
-        .setSupportComplexTypes(true);
+        .setSupportComplexTypes(supportComplexTypes);
 
     if (props != null) {
       hsBuilder.setProperties(props);
@@ -104,10 +106,24 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
   @Override
   protected void finalizeConnection(BitToUserHandshake handshake, BasicClientWithConnection.ServerConnection connection) {
   }
-  
+
   @Override
   public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
     return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
   }
 
+  /**
+   * Sets whether the application is willing to accept complex types (Map, Arrays) in the returned result set.
+   * Default is {@code true}. If set to {@code false}, the complex types are returned as JSON encoded VARCHAR type.
+   *
+   * @throws IllegalStateException if called after a connection has been established.
+   */
+  public UserClient setSupportComplexTypes(boolean supportComplexTypes) {
+    if (isActive()) {
+      throw new IllegalStateException("Attempted to modify connection property after connection has been established.");
+    }
+    this.supportComplexTypes = supportComplexTypes;
+    return this;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index e96ba6c..aaf3c2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -111,8 +111,12 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
     }
 
     void setUser(UserToBitHandshake inbound) throws IOException {
-      session = new UserSession(worker.getSystemOptions(), inbound.getCredentials(), inbound.getProperties());
-      session.setSupportComplexTypes(inbound.getSupportComplexTypes());
+      session = UserSession.Builder.newBuilder()
+          .withCredentials(inbound.getCredentials())
+          .withOptionManager(worker.getSystemOptions())
+          .withUserProperties(inbound.getProperties())
+          .setSupportComplexTypes(inbound.getSupportComplexTypes())
+          .build();
     }
 
     public UserSession getSession(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index 18e365e..13414da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.rpc.user;
 
-import java.io.IOException;
 import java.util.Map;
 
 import net.hydromatic.optiq.SchemaPlus;
@@ -42,18 +41,56 @@ public class UserSession {
   private Map<String, String> properties;
   private OptionManager options;
 
-  public UserSession(OptionManager systemOptions, UserCredentials credentials, UserProperties properties) throws IOException{
-    this.credentials = credentials;
-    this.options = new SessionOptionManager(systemOptions);
-    this.properties = Maps.newHashMap();
+  public static class Builder {
+    UserSession userSession;
 
-    if (properties == null) return;
-    for (int i=0; i<properties.getPropertiesCount(); i++) {
-      Property prop = properties.getProperties(i);
-      this.properties.put(prop.getKey(), prop.getValue());
+    public static Builder newBuilder() {
+      return new Builder();
+    }
+
+    public Builder withCredentials(UserCredentials credentials) {
+      userSession.credentials = credentials;
+      return this;
+    }
+
+    public Builder withOptionManager(OptionManager systemOptions) {
+      userSession.options = new SessionOptionManager(systemOptions);
+      return this;
+    }
+
+    public Builder withUserProperties(UserProperties properties) {
+      userSession.properties = Maps.newHashMap();
+      if (properties != null) {
+        for (int i = 0; i < properties.getPropertiesCount(); i++) {
+          Property prop = properties.getProperties(i);
+          userSession.properties.put(prop.getKey(), prop.getValue());
+        }
+      }
+      return this;
+    }
+
+    public Builder setSupportComplexTypes(boolean supportComplexTypes) {
+      userSession.supportComplexTypes = supportComplexTypes;
+      return this;
+    }
+
+    public UserSession build() {
+      UserSession session = userSession;
+      userSession = null;
+      return session;
+    }
+
+    Builder() {
+      userSession = new UserSession();
     }
   }
 
+  private UserSession() { }
+
+  public boolean isSupportComplexTypes() {
+    return supportComplexTypes;
+  }
+
   public OptionManager getOptions(){
     return options;
   }
@@ -62,7 +99,6 @@ public class UserSession {
     return user;
   }
 
-
   /**
    * Update the schema path for the session.
    * @param fullPath The desired path to set to.
@@ -107,13 +143,4 @@ public class UserSession {
     return schema;
   }
 
-  public boolean isSupportComplexTypes() {
-    return supportComplexTypes;
-  }
-
-  public UserSession setSupportComplexTypes(boolean supportComplexType) {
-    this.supportComplexTypes = supportComplexType;
-    return this;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/util/ConvertUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ConvertUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ConvertUtil.java
index 750885c..ffcb7d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ConvertUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ConvertUtil.java
@@ -23,6 +23,7 @@ import io.netty.buffer.Unpooled;
 
 import java.io.DataInput;
 
+import org.apache.drill.common.util.DrillStringUtils;
 import org.apache.drill.exec.expr.fn.impl.StringFunctionUtil;
 
 public class ConvertUtil {
@@ -45,7 +46,7 @@ public class ConvertUtil {
     int actualLen = (end - start);
     if (actualLen != requiredLen) {
       throw new IllegalArgumentException(String.format("Wrong length %d(%d-%d) in the buffer '%s', expected %d.",
-          actualLen, end, start, StringFunctionUtil.toBinaryString(buffer, start, end), requiredLen));
+          actualLen, end, start, DrillStringUtils.toBinaryString(buffer, start, end), requiredLen));
     }
   }
 
@@ -90,7 +91,7 @@ public class ConvertUtil {
       int availableBytes = (end-start);
       if (availableBytes < getVIntSize(i)) {
         throw new NumberFormatException("Expected " + getVIntSize(i) + " bytes but the buffer '"
-            + StringFunctionUtil.toBinaryString(buffer, start, end) + "' has only "
+            + DrillStringUtils.toBinaryString(buffer, start, end) + "' has only "
             + availableBytes + " bytes.");
       }
       buffer.writerIndex(start);
@@ -150,7 +151,7 @@ public class ConvertUtil {
         return firstByte;
       } else if (availableBytes < len) {
         throw new NumberFormatException("Expected " + len + " bytes but the buffer '"
-            + StringFunctionUtil.toBinaryString(buffer, start, end) + "' has  "
+            + DrillStringUtils.toBinaryString(buffer, start, end) + "' has  "
             + availableBytes + " bytes.");
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
index 38cd530..68f4550 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.util;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
 
@@ -84,7 +86,8 @@ public class VectorUtil {
       int columnWidth = getColumnWidth(columnWidths, columnIndex);
       width += columnWidth + 2;
       formats.add("| %-" + columnWidth + "s");
-      columns.add(vw.getValueVector().getField().getPath().getAsUnescapedPath());
+      MaterializedField field = vw.getValueVector().getField();
+      columns.add(field.getPath().getAsUnescapedPath() + "<" + field.getType().getMinorType() + ">");
       columnIndex++;
     }
 
@@ -107,19 +110,13 @@ public class VectorUtil {
       for (VectorWrapper<?> vw : va) {
         int columnWidth = getColumnWidth(columnWidths, columnIndex);
         Object o = vw.getValueVector().getAccessor().getObject(row);
-        if (o == null) {
-          //null value
-          System.out.printf(formats.get(columnIndex), "");
-        }
-        else if (o instanceof byte[]) {
-          String value = new String((byte[]) o);
-          System.out.printf(formats.get(columnIndex), value.length() <= columnWidth ? value : value.substring(0, columnWidth - 1));
-        } else if (o instanceof List) {
-          System.out.printf("| %s", o);
+        String cellString;
+        if (o instanceof byte[]) {
+          cellString = DrillStringUtils.toBinaryString((byte[]) o);
         } else {
-          String value = o.toString();
-          System.out.printf(formats.get(columnIndex), value.length() <= columnWidth ? value : value.substring(0,columnWidth - 1));
+          cellString = DrillStringUtils.escapeNewLines(String.valueOf(o));
         }
+        System.out.printf(formats.get(columnIndex), cellString.length() <= columnWidth ? cellString : cellString.substring(0, columnWidth - 1));
         columnIndex++;
       }
       System.out.printf("|\n");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index a819453..741323b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -100,8 +100,7 @@ public class PlanningBase extends ExecTest{
     registry.init();
     final FunctionImplementationRegistry functionRegistry = new FunctionImplementationRegistry(config);
     final SchemaPlus root = Frameworks.createRootSchema(false);
-    registry.getSchemaFactory().registerSchemas(new UserSession(null, null, null).setSupportComplexTypes(true), root);
-
+    registry.getSchemaFactory().registerSchemas(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), root);
 
 
     new NonStrictExpectations() {
@@ -113,7 +112,7 @@ public class PlanningBase extends ExecTest{
         context.getFunctionRegistry();
         result = functionRegistry;
         context.getSession();
-        result = new UserSession(null, null, null).setSupportComplexTypes(true);
+        result = UserSession.Builder.newBuilder().setSupportComplexTypes(true).build();
         context.getCurrentEndpoint();
         result = DrillbitEndpoint.getDefaultInstance();
         context.getActiveEndpoints();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index f6200f0..a686fa9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -110,7 +110,7 @@ public class TestOptiqPlans extends ExecTest {
     };
     RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet();
     DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, cache, workBus, new LocalPStoreProvider(DrillConfig.create()));
-    QueryContext qc = new QueryContext(new UserSession(null, null, null).setSupportComplexTypes(true), QueryId.getDefaultInstance(), bitContext);
+    QueryContext qc = new QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), QueryId.getDefaultInstance(), bitContext);
     PhysicalPlanReader reader = bitContext.getPlanReader();
     LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
     PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/exec/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc/pom.xml b/exec/jdbc/pom.xml
index afaaa6d..f19294f 100644
--- a/exec/jdbc/pom.xml
+++ b/exec/jdbc/pom.xml
@@ -98,6 +98,7 @@
         <inherited>true</inherited>
         <configuration>
           <excludes>
+            <exclude>**/.checkstyle</exclude>
             <exclude>**/.buildpath</exclude>
             <exclude>**/*.json</exclude>
             <exclude>**/git.properties</exclude>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index faeba6f..2f4a58c 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -455,9 +455,9 @@ public final class UserBitShared {
      */
     INFO_SCHEMA_SUB_SCAN(30, 30),
     /**
-     * <code>FLATTEN = 31;</code>
+     * <code>COMPLEX_TO_JSON = 31;</code>
      */
-    FLATTEN(31, 31),
+    COMPLEX_TO_JSON(31, 31),
     ;
 
     /**
@@ -585,9 +585,9 @@ public final class UserBitShared {
      */
     public static final int INFO_SCHEMA_SUB_SCAN_VALUE = 30;
     /**
-     * <code>FLATTEN = 31;</code>
+     * <code>COMPLEX_TO_JSON = 31;</code>
      */
-    public static final int FLATTEN_VALUE = 31;
+    public static final int COMPLEX_TO_JSON_VALUE = 31;
 
 
     public final int getNumber() { return value; }
@@ -625,7 +625,7 @@ public final class UserBitShared {
         case 28: return TEXT_SUB_SCAN;
         case 29: return JSON_SUB_SCAN;
         case 30: return INFO_SCHEMA_SUB_SCAN;
-        case 31: return FLATTEN;
+        case 31: return COMPLEX_TO_JSON;
         default: return null;
       }
     }
@@ -16530,7 +16530,7 @@ public final class UserBitShared {
       "\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003*k\n\rFragmen" +
       "tState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATI",
       "ON\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCE" +
-      "LLED\020\004\022\n\n\006FAILED\020\005*\362\004\n\020CoreOperatorType\022" +
+      "LLED\020\004\022\n\n\006FAILED\020\005*\372\004\n\020CoreOperatorType\022" +
       "\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001" +
       "\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH" +
       "_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITIO" +
@@ -16545,9 +16545,9 @@ public final class UserBitShared {
       "_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030\022\022\n\016PARQ" +
       "UET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013TEX" +
       "T_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON_SU" +
-      "B_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\013\n\007FL" +
-      "ATTEN\020\037B.\n\033org.apache.drill.exec.protoB\r" +
-      "UserBitSharedH\001"
+      "B_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\023\n\017CO" +
+      "MPLEX_TO_JSON\020\037B.\n\033org.apache.drill.exec" +
+      ".protoB\rUserBitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index abd7b78..0485a95 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -53,7 +53,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
     TEXT_SUB_SCAN(28),
     JSON_SUB_SCAN(29),
     INFO_SCHEMA_SUB_SCAN(30),
-    FLATTEN(31);
+    COMPLEX_TO_JSON(31);
     
     public final int number;
     
@@ -102,7 +102,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
             case 28: return TEXT_SUB_SCAN;
             case 29: return JSON_SUB_SCAN;
             case 30: return INFO_SCHEMA_SUB_SCAN;
-            case 31: return FLATTEN;
+            case 31: return COMPLEX_TO_JSON;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2e07b0b8/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index eb56efb..fc6f1b5 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -198,5 +198,5 @@ enum CoreOperatorType {
   TEXT_SUB_SCAN = 28;
   JSON_SUB_SCAN = 29;
   INFO_SCHEMA_SUB_SCAN = 30;
-  FLATTEN = 31;
+  COMPLEX_TO_JSON = 31;
 }