You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/07/19 06:09:27 UTC

[drill] branch master updated (6bb0879 -> b1aca33)

This is an automated email from the ASF dual-hosted git repository.

boaz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from 6bb0879  DRILL-6612: Query fails with AssertionError when joining persistent and temporary tables
     new 2c92ea2  DRILL-6496: Added print methods for debugging tests, and fixed missing log statement in VectorUtils.
     new 5a0c75f  DRILL-6475: Unnest: Null fieldId Pointer.
     new b1aca33  DRILL-6104: Add Log/Regex Format Plugin

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mapr/drill/maprdb/tests/json/BaseJsonTest.java |   6 +-
 .../java/org/apache/drill/hbase/BaseHBaseTest.java |   8 +-
 .../drill/hbase/TestHBaseCFAsJSONString.java       |   2 +-
 .../org/apache/drill/hbase/TestHBaseQueries.java   |   2 +-
 .../drill/exec/store/kafka/KafkaTestBase.java      |   8 +-
 .../drill/exec/store/mongo/MongoTestBase.java      |  10 +-
 docs/dev/TestLogging.md                            |  37 +
 docs/dev/Testing.md                                |   1 -
 .../java/org/apache/drill/exec/client/DumpCat.java |   2 +-
 ...tsListener.java => LoggingResultsListener.java} |  18 +-
 .../apache/drill/exec/client/QuerySubmitter.java   |   2 +-
 .../visitor/AdjustOperatorsSchemaVisitor.java      | 148 ++++
 .../physical/visitor/JoinPrelRenameVisitor.java    |  87 ---
 .../planner/sql/handlers/DefaultSqlHandler.java    |   5 +-
 .../drill/exec/store/log/LogFormatConfig.java      | 119 ++++
 .../drill/exec/store/log/LogFormatField.java       |  86 +++
 .../LogFormatPlugin.java}                          |  62 +-
 .../drill/exec/store/log/LogRecordReader.java      | 764 +++++++++++++++++++++
 .../java/org/apache/drill/exec/store/log/README.md |  86 +++
 .../util/{JarUtil.java => CheckedSupplier.java}    |  22 +-
 .../org/apache/drill/exec/util/VectorUtil.java     |  14 +-
 .../drill/TestTpchDistributedConcurrent.java       |   2 +-
 .../exec/physical/impl/TestConvertFunctions.java   |   2 +-
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |  50 +-
 .../impl/lateraljoin/TestLateralPlans.java         |  24 +
 .../impl/project/TestSimpleProjection.java         |   2 +-
 .../impl/xsort/TestSortSpillWithException.java     |   4 +-
 .../org/apache/drill/exec/pop/PopUnitTestBase.java |   2 -
 .../exec/server/options/TestConfigLinkage.java     |   4 +-
 .../store/dfs/TestFormatPluginOptionExtractor.java |  14 +-
 .../apache/drill/exec/store/log/TestLogReader.java | 366 ++++++++++
 .../exec/store/text/TextRecordReaderTest.java      |   2 +-
 .../drill/exec/testing/TestResourceLeak.java       |   2 +-
 .../java/org/apache/drill/test/BaseTestQuery.java  |  15 +-
 .../drill/test/BufferingQueryEventListener.java    |   5 +-
 .../java/org/apache/drill/test/ClientFixture.java  |  31 +-
 .../java/org/apache/drill/test/ClusterFixture.java |   3 +-
 .../java/org/apache/drill/test/ClusterTest.java    |  40 +-
 .../java/org/apache/drill/test/ExampleTest.java    |   8 +-
 .../apache/drill/test/PrintingResultsListener.java |  56 ++
 .../java/org/apache/drill/test/PrintingUtils.java  |  75 ++
 .../java/org/apache/drill/test/QueryBuilder.java   |  65 +-
 .../java/org/apache/drill/test/QueryTestUtil.java  | 111 ++-
 .../src/test/resources/regex/baddates.log2         |   5 +
 .../src/test/resources/regex/mysql.sqllog          |   6 +
 .../src/test/resources/regex/mysql.sqllog2         |   6 +
 .../java-exec/src/test/resources/regex/simple.log1 |   5 +
 .../java-exec/src/test/resources/regex/simple.log2 |   5 +
 pom.xml                                            |   6 +-
 .../org/apache/drill/exec/proto/UserBitShared.java |  10 +
 50 files changed, 2105 insertions(+), 310 deletions(-)
 rename exec/java-exec/src/main/java/org/apache/drill/exec/client/{PrintingResultsListener.java => LoggingResultsListener.java} (84%)
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/AdjustOperatorsSchemaVisitor.java
 delete mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatConfig.java
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatField.java
 copy exec/java-exec/src/main/java/org/apache/drill/exec/store/{image/ImageFormatPlugin.java => log/LogFormatPlugin.java} (51%)
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/log/README.md
 copy exec/java-exec/src/main/java/org/apache/drill/exec/util/{JarUtil.java => CheckedSupplier.java} (67%)
 create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
 create mode 100644 exec/java-exec/src/test/java/org/apache/drill/test/PrintingResultsListener.java
 create mode 100644 exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
 create mode 100644 exec/java-exec/src/test/resources/regex/baddates.log2
 create mode 100644 exec/java-exec/src/test/resources/regex/mysql.sqllog
 create mode 100644 exec/java-exec/src/test/resources/regex/mysql.sqllog2
 create mode 100644 exec/java-exec/src/test/resources/regex/simple.log1
 create mode 100644 exec/java-exec/src/test/resources/regex/simple.log2


[drill] 01/03: DRILL-6496: Added print methods for debugging tests, and fixed missing log statement in VectorUtils.

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 2c92ea22d9aa098cf3aaaf7893bd2fc33a08af31
Author: Timothy Farkas <ti...@apache.org>
AuthorDate: Mon Jun 25 13:50:37 2018 -0700

    DRILL-6496: Added print methods for debugging tests, and fixed missing log statement in VectorUtils.
    
    closes #1336
---
 .../mapr/drill/maprdb/tests/json/BaseJsonTest.java |   6 +-
 .../java/org/apache/drill/hbase/BaseHBaseTest.java |   8 +-
 .../drill/hbase/TestHBaseCFAsJSONString.java       |   2 +-
 .../org/apache/drill/hbase/TestHBaseQueries.java   |   2 +-
 .../drill/exec/store/kafka/KafkaTestBase.java      |   8 +-
 .../drill/exec/store/mongo/MongoTestBase.java      |  10 +-
 docs/dev/TestLogging.md                            |  37 +++++++
 docs/dev/Testing.md                                |   1 -
 .../java/org/apache/drill/exec/client/DumpCat.java |   2 +-
 ...tsListener.java => LoggingResultsListener.java} |  18 ++--
 .../apache/drill/exec/client/QuerySubmitter.java   |   2 +-
 .../apache/drill/exec/util/CheckedSupplier.java    |  29 ++++++
 .../org/apache/drill/exec/util/VectorUtil.java     |  14 +--
 .../drill/TestTpchDistributedConcurrent.java       |   2 +-
 .../exec/physical/impl/TestConvertFunctions.java   |   2 +-
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |  50 +++++-----
 .../impl/project/TestSimpleProjection.java         |   2 +-
 .../impl/xsort/TestSortSpillWithException.java     |   4 +-
 .../org/apache/drill/exec/pop/PopUnitTestBase.java |   2 -
 .../exec/server/options/TestConfigLinkage.java     |   4 +-
 .../exec/store/text/TextRecordReaderTest.java      |   2 +-
 .../drill/exec/testing/TestResourceLeak.java       |   2 +-
 .../java/org/apache/drill/test/BaseTestQuery.java  |  15 ++-
 .../drill/test/BufferingQueryEventListener.java    |   5 +-
 .../java/org/apache/drill/test/ClientFixture.java  |  31 ++++--
 .../java/org/apache/drill/test/ClusterFixture.java |   3 +-
 .../java/org/apache/drill/test/ClusterTest.java    |  40 ++------
 .../java/org/apache/drill/test/ExampleTest.java    |   8 +-
 .../apache/drill/test/PrintingResultsListener.java |  56 +++++++++++
 .../java/org/apache/drill/test/PrintingUtils.java  |  75 ++++++++++++++
 .../java/org/apache/drill/test/QueryBuilder.java   |  65 +++++++-----
 .../java/org/apache/drill/test/QueryTestUtil.java  | 111 +++++++++++++++++----
 pom.xml                                            |   1 -
 33 files changed, 447 insertions(+), 172 deletions(-)

diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java
index ee32aa1..550fb73 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/BaseJsonTest.java
@@ -55,11 +55,11 @@ public class BaseJsonTest extends BaseTestQuery {
 
   protected void runSQLAndVerifyCount(String sql, int expectedRowCount) throws Exception{
     List<QueryDataBatch> results = runHBaseSQLlWithResults(sql);
-    printResultAndVerifyRowCount(results, expectedRowCount);
+    logResultAndVerifyRowCount(results, expectedRowCount);
   }
 
-  private void printResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount) throws SchemaChangeException {
-    int rowCount = printResult(results);
+  private void logResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount) throws SchemaChangeException {
+    int rowCount = logResult(results);
     if (expectedRowCount != -1) {
       Assert.assertEquals(expectedRowCount, rowCount);
     }
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index dd7ce67..ab75eda 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -79,7 +79,7 @@ public class BaseHBaseTest extends BaseTestQuery {
   protected void runHBasePhysicalVerifyCount(String planFile, String tableName, int expectedRowCount) throws Exception{
     String physicalPlan = getPlanText(planFile, tableName);
     List<QueryDataBatch> results = testPhysicalWithResults(physicalPlan);
-    printResultAndVerifyRowCount(results, expectedRowCount);
+    logResultAndVerifyRowCount(results, expectedRowCount);
   }
 
   protected List<QueryDataBatch> runHBaseSQLlWithResults(String sql) throws Exception {
@@ -89,11 +89,11 @@ public class BaseHBaseTest extends BaseTestQuery {
 
   protected void runHBaseSQLVerifyCount(String sql, int expectedRowCount) throws Exception{
     List<QueryDataBatch> results = runHBaseSQLlWithResults(sql);
-    printResultAndVerifyRowCount(results, expectedRowCount);
+    logResultAndVerifyRowCount(results, expectedRowCount);
   }
 
-  private void printResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount) throws SchemaChangeException {
-    int rowCount = printResult(results);
+  private void logResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount) throws SchemaChangeException {
+    int rowCount = logResult(results);
     if (expectedRowCount != -1) {
       Assert.assertEquals(expectedRowCount, rowCount);
     }
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
index 592dda0..50cda8f 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
@@ -55,7 +55,7 @@ public class TestHBaseCFAsJSONString extends BaseHBaseTest {
   public void testColumnFamiliesAsJSONString() throws Exception {
     setColumnWidths(new int[] {112, 12});
     List<QueryDataBatch> resultList = runHBaseSQLlWithResults("SELECT f, f2 FROM hbase.`[TABLE_NAME]` tableName LIMIT 1");
-    printResult(resultList);
+    logResult(resultList);
   }
 
 }
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java
index abd76a7..27882b5 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java
@@ -96,7 +96,7 @@ public class TestHBaseQueries extends BaseHBaseTest {
         List<QueryDataBatch> resultList = runHBaseSQLlWithResults("SELECT row_key,\n"
             + " CAST(t.f.c1 as INT) c1, CAST(t.f.c2 as BIGINT) c2, CAST(t.f.c3 as INT) c3,\n"
             + " CAST(t.f.c4 as INT) c4 FROM hbase.TestTableNullStr t where row_key='a1'");
-        printResult(resultList);
+        logResult(resultList);
     }
     finally {
         test("alter system reset `drill.exec.functions.cast_empty_string_to_null`;");
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
index e30f3e6..83da934 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
@@ -66,12 +66,12 @@ public class KafkaTestBase extends PlanTestBase {
 
   public void runKafkaSQLVerifyCount(String sql, int expectedRowCount) throws Exception {
     List<QueryDataBatch> results = runKafkaSQLWithResults(sql);
-    printResultAndVerifyRowCount(results, expectedRowCount);
+    logResultAndVerifyRowCount(results, expectedRowCount);
   }
 
-  public void printResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount)
+  public void logResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount)
       throws SchemaChangeException {
-    int rowCount = printResult(results);
+    int rowCount = logResult(results);
     if (expectedRowCount != -1) {
       Assert.assertEquals(expectedRowCount, rowCount);
     }
@@ -89,4 +89,4 @@ public class KafkaTestBase extends PlanTestBase {
     TestKafkaSuit.tearDownCluster();
   }
 
-}
\ No newline at end of file
+}
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
index 923c648..4b4412f 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestBase.java
@@ -64,12 +64,12 @@ public class MongoTestBase extends PlanTestBase implements MongoTestConstants {
   public void runMongoSQLVerifyCount(String sql, int expectedRowCount)
       throws Exception {
     List<QueryDataBatch> results = runMongoSQLWithResults(sql);
-    printResultAndVerifyRowCount(results, expectedRowCount);
+    logResultAndVerifyRowCount(results, expectedRowCount);
   }
 
-  public void printResultAndVerifyRowCount(List<QueryDataBatch> results,
-      int expectedRowCount) throws SchemaChangeException {
-    int rowCount = printResult(results);
+  public void logResultAndVerifyRowCount(List<QueryDataBatch> results,
+                                         int expectedRowCount) throws SchemaChangeException {
+    int rowCount = logResult(results);
     if (expectedRowCount != -1) {
       Assert.assertEquals(expectedRowCount, rowCount);
     }
@@ -92,4 +92,4 @@ public class MongoTestBase extends PlanTestBase implements MongoTestConstants {
     storagePlugin = null;
   }
 
-}
\ No newline at end of file
+}
diff --git a/docs/dev/TestLogging.md b/docs/dev/TestLogging.md
index b87adb1..59fec54 100644
--- a/docs/dev/TestLogging.md
+++ b/docs/dev/TestLogging.md
@@ -103,3 +103,40 @@ Then, if for some reason you want to see the Logback logging, add the following
 -Dlogback.statusListenerClass=ch.qos.logback.core.status.OnConsoleStatusListener
 ```
 The launch configuration option overrides (appears on the Java command line after) the global setting.
+
+## Test Logging Configurations
+
+### Default Test Log Levels
+
+There is a global `logback-test.xml` configuration file in [common/src/test/resources/logback-test.xml](../../common/src/test/resources/logback-test.xml). This
+logging configuration by default outputs error level logs to stdout.
+
+Debug level logging to lilith can be turned on by adding `-Ddrill.lilith.enable=true` to the command used to run tests.
+
+### Changing Test Log Levels
+
+Often times it is most convenient to output logs to the console for debugging. This is best done programatically
+by using the [LogFixture](../../exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java). The [LogFixture](../../exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java)
+allows temporarily changing log levels for blocks of code programatically for debugging. An example of doing this is
+the following.
+
+```
+    try(LogFixture logFixture = new LogFixture.LogFixtureBuilder()
+      .logger(MyClass.class, Level.INFO)
+      .toConsole() // This redirects output to stdout
+      .build()) {
+      // Code block with different log levels.
+    }
+```
+
+More details on how to use the [LogFixture](../../exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java) can be found
+int the javadocs for the class. Additionally, there are several methods that allow printing of query results to the console for debugging:
+
+ * BaseTestQuery.printResult
+ * QueryTestUtil.testRunAndPrint
+ * QueryBuilder.print
+ * ClusterTest.runAndPrint
+ * ClientFixture.runQueriesAndPrint
+ 
+**IMPORTANT NOTE:** The methods described above along with LogFixtureBuilder.toConsole() should only be used for debugging. Code
+that uses these methods should not be committed, since it produces excess logging on our build servers.
diff --git a/docs/dev/Testing.md b/docs/dev/Testing.md
index 09343d6..148c7b1 100644
--- a/docs/dev/Testing.md
+++ b/docs/dev/Testing.md
@@ -149,7 +149,6 @@ Drill uses the [Maven Surefire plugin](http://maven.apache.org/components/surefi
               -Ddrill.exec.sys.store.provider.local.write=false
               -Dorg.apache.drill.exec.server.Drillbit.system_options=\
                "org.apache.drill.exec.compile.ClassTransformer.scalar_replacement=on"
-              -Ddrill.test.query.printing.silent=true
               -Ddrill.catastrophic_to_standard_out=true
               -XX:MaxPermSize=512M -XX:MaxDirectMemorySize=3072M
               -Djava.net.preferIPv4Stack=true
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
index 6c85921..ff0d0b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
@@ -261,7 +261,7 @@ public class DumpCat {
     }
 
     /* show the contents in the batch */
-    VectorUtil.showVectorAccessibleContent(vectorContainer);
+    VectorUtil.logVectorAccessibleContent(vectorContainer);
   }
 
   /* Get batch meta info : rows, selectedRows, dataSize */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
similarity index 84%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
index c233837..454abfa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
@@ -40,8 +40,8 @@ import com.google.common.base.Stopwatch;
 
 import io.netty.buffer.DrillBuf;
 
-public class PrintingResultsListener implements UserResultsListener {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PrintingResultsListener.class);
+public class LoggingResultsListener implements UserResultsListener {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LoggingResultsListener.class);
 
   private final AtomicInteger count = new AtomicInteger();
   private final Stopwatch w = Stopwatch.createUnstarted();
@@ -50,7 +50,7 @@ public class PrintingResultsListener implements UserResultsListener {
   private final int columnWidth;
   private final BufferAllocator allocator;
 
-  public PrintingResultsListener(DrillConfig config, Format format, int columnWidth) {
+  public LoggingResultsListener(DrillConfig config, Format format, int columnWidth) {
     this.allocator = RootAllocatorFactory.newRoot(config);
     this.loader = new RecordBatchLoader(allocator);
     this.format = format;
@@ -59,15 +59,13 @@ public class PrintingResultsListener implements UserResultsListener {
 
   @Override
   public void submissionFailed(UserException ex) {
-    System.out.println("Exception (no rows returned): " + ex + ".  Returned in " + w.elapsed(TimeUnit.MILLISECONDS)
-        + "ms.");
+    logger.info("Exception (no rows returned). Returned in {} ms.", w.elapsed(TimeUnit.MILLISECONDS), ex);
   }
 
   @Override
   public void queryCompleted(QueryState state) {
     DrillAutoCloseables.closeNoChecked(allocator);
-    System.out.println("Total rows returned : " + count.get() + ".  Returned in " + w.elapsed(TimeUnit.MILLISECONDS)
-        + "ms.");
+    logger.info("Total rows returned: {}. Returned in {} ms.", count.get(), w.elapsed(TimeUnit.MILLISECONDS));
   }
 
   @Override
@@ -90,13 +88,13 @@ public class PrintingResultsListener implements UserResultsListener {
         try {
           switch(format) {
             case TABLE:
-              VectorUtil.showVectorAccessibleContent(loader, columnWidth);
+              VectorUtil.logVectorAccessibleContent(loader, columnWidth);
               break;
             case TSV:
-              VectorUtil.showVectorAccessibleContent(loader, "\t");
+              VectorUtil.logVectorAccessibleContent(loader, "\t");
               break;
             case CSV:
-              VectorUtil.showVectorAccessibleContent(loader, ",");
+              VectorUtil.logVectorAccessibleContent(loader, ",");
               break;
             default:
               throw new IllegalStateException(format.toString());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index d9f47b5..f9b2d00 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -189,7 +189,7 @@ public class QuerySubmitter {
     Stopwatch watch = Stopwatch.createUnstarted();
     for (String query : queries) {
       AwaitableUserResultsListener listener =
-          new AwaitableUserResultsListener(new PrintingResultsListener(client.getConfig(), outputFormat, width));
+          new AwaitableUserResultsListener(new LoggingResultsListener(client.getConfig(), outputFormat, width));
       watch.start();
       client.runQuery(queryType, query, listener);
       int rows = listener.await();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java
new file mode 100644
index 0000000..b744ac8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+/**
+ * The java standard library does not provide a lambda function interface for funtions that take no arguments,
+ * but that throw an exception. So, we have to define our own here.
+ * @param <T> The return type of the lambda function.
+ * @param <E> The type of exception thrown by the lambda function.
+ */
+@FunctionalInterface
+public interface CheckedSupplier<T, E extends Exception> {
+  T get() throws E;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
index 9808a2b..8729a39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -39,7 +39,7 @@ public class VectorUtil {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorUtil.class);
   public static final int DEFAULT_COLUMN_WIDTH = 15;
 
-  public static void showVectorAccessibleContent(VectorAccessible va, final String delimiter) {
+  public static void logVectorAccessibleContent(VectorAccessible va, final String delimiter) {
     final StringBuilder sb = new StringBuilder();
     int rows = va.getRecordCount();
     sb.append(rows).append(" row(s):\n");
@@ -133,15 +133,15 @@ public class VectorUtil {
     }
   }
 
-  public static void showVectorAccessibleContent(VectorAccessible va) {
-    showVectorAccessibleContent(va, DEFAULT_COLUMN_WIDTH);
+  public static void logVectorAccessibleContent(VectorAccessible va) {
+    logVectorAccessibleContent(va, DEFAULT_COLUMN_WIDTH);
   }
 
-  public static void showVectorAccessibleContent(VectorAccessible va, int columnWidth) {
-    showVectorAccessibleContent(va, new int[]{ columnWidth });
+  public static void logVectorAccessibleContent(VectorAccessible va, int columnWidth) {
+    logVectorAccessibleContent(va, new int[]{ columnWidth });
   }
 
-  public static void showVectorAccessibleContent(VectorAccessible va, int[] columnWidths) {
+  public static void logVectorAccessibleContent(VectorAccessible va, int[] columnWidths) {
     final StringBuilder sb = new StringBuilder();
     int width = 0;
     int columnIndex = 0;
@@ -194,6 +194,8 @@ public class VectorUtil {
     for (VectorWrapper<?> vw : va) {
       vw.clear();
     }
+
+    logger.info(sb.toString());
   }
 
   private static String expandMapSchema(MaterializedField mapField) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
index bd68b53..7258a97 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
@@ -181,7 +181,7 @@ public class TestTpchDistributedConcurrent extends BaseTestQuery {
 
   @Test
   public void testConcurrentQueries() throws Exception {
-    QueryTestUtil.testRunAndPrint(client, UserBitShared.QueryType.SQL, alterSession);
+    QueryTestUtil.testRunAndLog(client, UserBitShared.QueryType.SQL, alterSession);
 
     testThread = Thread.currentThread();
     final QuerySubmitter querySubmitter = new QuerySubmitter();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java
index 581c972..2bff1da 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestConvertFunctions.java
@@ -518,7 +518,7 @@ public class TestConvertFunctions extends BaseTestQuery {
       count += result.getHeader().getRowCount();
       loader.load(result.getHeader().getDef(), result.getData());
       if (loader.getRecordCount() > 0) {
-        VectorUtil.showVectorAccessibleContent(loader);
+        VectorUtil.logVectorAccessibleContent(loader);
       }
       loader.clear();
       result.release();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 394e732..88108a6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -61,7 +61,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
       "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) LIMIT 1) orders";
-    test(Sql);
+    runAndLog(Sql);
   }
 
   @Test
@@ -69,7 +69,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
       "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) WHERE t.ord.o_amount > 10) orders";
-    test(Sql);
+    runAndLog(Sql);
   }
 
   @Test
@@ -77,7 +77,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
       "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) WHERE t.ord.o_amount > 10 LIMIT 1) orders";
-    test(Sql);
+    runAndLog(Sql);
   }
 
   @Test
@@ -105,7 +105,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   @Test
   public void testLateral_WithSortAndLimitInSubQuery() throws Exception {
 
-    test("alter session set `planner.enable_topn`=false");
+    runAndLog("alter session set `planner.enable_topn`=false");
 
     String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
@@ -123,7 +123,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
         .baselineValues("customer4", 32.0,  1030.1)
         .go();
     } finally {
-      test("alter session set `planner.enable_topn`=true");
+      runAndLog("alter session set `planner.enable_topn`=true");
     }
   }
 
@@ -149,7 +149,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer OUTER APPLY " +
       "(SELECT t.ord.o_id as o_id , t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) WHERE t.ord.o_amount > 10 LIMIT 1) orders";
-    test(Sql);
+    runAndLog(Sql);
   }
 
   @Test
@@ -157,7 +157,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer LEFT JOIN LATERAL " +
       "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) WHERE t.ord.o_amount > 10 LIMIT 1) orders ON TRUE";
-    test(Sql);
+    runAndLog(Sql);
   }
 
   @Test
@@ -167,7 +167,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       " (SELECT t.ord.o_id AS order_id, t.ord.o_amount AS order_amt, U2.item_name AS itemName, U2.item_num AS " +
         "itemNum FROM UNNEST(customer.orders) t(ord) , LATERAL" +
       " (SELECT t1.ord.i_name AS item_name, t1.ord.i_number AS item_num FROM UNNEST(t.ord) AS t1(ord)) AS U2) AS U1";
-    test(Sql);
+    runAndLog(Sql);
   }
 
   @Test
@@ -261,7 +261,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   public void testNestedUnnest() throws Exception {
     String Sql = "select * from (select customer.orders as orders from cp.`lateraljoin/nested-customer.parquet` customer ) t1," +
         " lateral ( select t.ord.items as items from unnest(t1.orders) t(ord) ) t2, unnest(t2.items) t3(item) ";
-    test(Sql);
+    runAndLog(Sql);
   }
 
   /***********************************************************************************************
@@ -273,7 +273,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
     String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)) orders";
-    test(sql);
+    runAndLog(sql);
   }
 
   @Test
@@ -281,7 +281,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
     String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord) LIMIT 10) orders";
-    test(sql);
+    runAndLog(sql);
   }
 
   @Test
@@ -303,7 +303,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   @Test
   public void testMultipleBatchesLateral_WithSortAndLimitInSubQuery() throws Exception {
 
-    test("alter session set `planner.enable_topn`=false");
+    runAndLog("alter session set `planner.enable_topn`=false");
 
     String sql = "SELECT customer.c_name, orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
@@ -319,7 +319,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
         .baselineValues("Customer#000007180", (long)54646821, 367189.55)
         .go();
     } finally {
-      test("alter session set `planner.enable_topn`=true");
+      runAndLog("alter session set `planner.enable_topn`=true");
     }
   }
 
@@ -346,7 +346,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 100000 LIMIT 2) " +
       "orders";
-    test(sql);
+    runAndLog(sql);
   }
 
   /***********************************************************************************************
@@ -362,7 +362,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " +
         "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
         "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST (customer.c_orders) t(ord)) orders";
-      test(sql);
+      runAndLog(sql);
     } catch (Exception ex) {
       fail();
     } finally {
@@ -393,7 +393,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
         "FROM UNNEST(customer.c_orders) t1(o)) orders, " +
         "LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " +
         "FROM UNNEST(orders.lineitems) t2(l)) olineitems";
-      test(sql);
+      runAndLog(sql);
     } catch (Exception ex) {
       fail();
     } finally {
@@ -409,7 +409,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       String sql = "SELECT customer.c_name, customer.c_address, orders.o_orderkey, orders.o_totalprice " +
         "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
         "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)) orders";
-      test(sql);
+      runAndLog(sql);
     } catch (Exception ex) {
       fail();
     } finally {
@@ -429,7 +429,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
         " t1.o.o_shippriority as spriority FROM UNNEST(customer.c_orders) t1(o)) orders, " +
         "LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " +
         "FROM UNNEST(orders.lineitems) t2(l)) olineitems";
-      test(sql);
+      runAndLog(sql);
     } catch (Exception ex) {
       fail();
     } finally {
@@ -446,7 +446,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
         "orders.o_totalprice FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
         "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice, t.ord.o_shippriority o_shippriority FROM UNNEST(customer.c_orders) t(ord)) orders";
 
-      test(sql);
+      runAndLog(sql);
     } catch (Exception ex) {
       fail();
     } finally {
@@ -464,7 +464,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice  as o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 100000 LIMIT 2) " +
       "orders LIMIT 1";
-    test(sql);
+    runAndLog(sql);
   }
 
   @Test
@@ -473,7 +473,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 100000 LIMIT 2) " +
       "orders WHERE orders.o_totalprice > 240000";
-    test(sql);
+    runAndLog(sql);
   }
 
   @Test
@@ -482,7 +482,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord) WHERE t.ord.o_totalprice > 100000 LIMIT 2) " +
       "orders GROUP BY customer.c_name";
-    test(sql);
+    runAndLog(sql);
   }
 
   @Test
@@ -491,7 +491,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
       "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)) orders " +
       "ORDER BY orders.o_orderkey";
-    test(sql);
+    runAndLog(sql);
   }
 
   @Test
@@ -515,7 +515,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       .baselineValues(177819)
       .build().run();
     } finally {
-      test("alter session set `" + PlannerSettings.STREAMAGG.getOptionName() + "` = true");
+      runAndLog("alter session set `" + PlannerSettings.STREAMAGG.getOptionName() + "` = true");
     }
   }
 
@@ -541,7 +541,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       .baselineValues("dd",222L)
       .build().run();
     } finally {
-      test("alter session set `" + PlannerSettings.STREAMAGG.getOptionName() + "` = true");
+      runAndLog("alter session set `" + PlannerSettings.STREAMAGG.getOptionName() + "` = true");
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
index b2a899d..d71ac76 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
@@ -63,7 +63,7 @@ public class TestSimpleProjection extends ExecTest {
     final SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     while (exec.next()) {
-      VectorUtil.showVectorAccessibleContent(exec.getIncoming(), "\t");
+      VectorUtil.logVectorAccessibleContent(exec.getIncoming(), "\t");
       final NullableBigIntVector c1 = exec.getValueVectorById(new SchemaPath("col1", ExpressionPosition.UNKNOWN), NullableBigIntVector.class);
       final NullableBigIntVector c2 = exec.getValueVectorById(new SchemaPath("col2", ExpressionPosition.UNKNOWN), NullableBigIntVector.class);
       final NullableBigIntVector.Accessor a1 = c1.getAccessor();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
index 5ab29de..a8797cd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
@@ -86,7 +86,7 @@ public class TestSortSpillWithException extends ClusterTest {
     ControlsInjectionUtil.setControls(cluster.client(), controls);
     // run a simple order by query
     try {
-      test("select employee_id from dfs.`xsort/2batches` order by employee_id");
+      runAndLog("select employee_id from dfs.`xsort/2batches` order by employee_id");
       fail("Query should have failed!");
     } catch (UserRemoteException e) {
       assertEquals(ErrorType.RESOURCE, e.getErrorType());
@@ -109,7 +109,7 @@ public class TestSortSpillWithException extends ClusterTest {
     ControlsInjectionUtil.setControls(cluster.client(), controls);
     // run a simple order by query
     try {
-      test("SELECT id_i, name_s250 FROM `mock`.`employee_500K` ORDER BY id_i");
+      runAndLog("SELECT id_i, name_s250 FROM `mock`.`employee_500K` ORDER BY id_i");
       fail("Query should have failed!");
     } catch (UserRemoteException e) {
       assertEquals(ErrorType.RESOURCE, e.getErrorType());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
index dbabcac..aabf9c4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.pop;
 import java.io.IOException;
 import java.util.Properties;
 
-import org.apache.drill.test.QueryTestUtil;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.DrillFileUtils;
 import org.apache.drill.exec.ExecConstants;
@@ -54,7 +53,6 @@ public abstract class PopUnitTestBase  extends ExecTest{
     props.put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false");
     props.put(ExecConstants.HTTP_ENABLE, "false");
     props.put(Drillbit.SYSTEM_OPTIONS_NAME, "org.apache.drill.exec.compile.ClassTransformer.scalar_replacement=on");
-    props.put(QueryTestUtil.TEST_QUERY_PRINTING_SILENT, "true");
     props.put("drill.catastrophic_to_standard_out", "true");
     CONFIG = DrillConfig.create(props);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/TestConfigLinkage.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/TestConfigLinkage.java
index d76e209..5b9ae97 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/TestConfigLinkage.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/TestConfigLinkage.java
@@ -317,8 +317,8 @@ public class TestConfigLinkage {
          ClientFixture client = cluster.clientFixture()) {
       client.queryBuilder().sql("ALTER SYSTEM SET `%s` = 'bleh'", MOCK_PROPERTY).run();
 
-      client.queryBuilder().sql("SELECT * FROM sys.%s", SystemTable.INTERNAL_OPTIONS.getTableName()).printCsv();
-      client.queryBuilder().sql("SELECT * FROM sys.%s", SystemTable.INTERNAL_OPTIONS_VAL.getTableName()).printCsv();
+      client.queryBuilder().sql("SELECT * FROM sys.%s", SystemTable.INTERNAL_OPTIONS.getTableName()).logCsv();
+      client.queryBuilder().sql("SELECT * FROM sys.%s", SystemTable.INTERNAL_OPTIONS_VAL.getTableName()).logCsv();
 
       String mockProp = client.queryBuilder().
         sql("SELECT string_val FROM sys.%s where name='%s'", SystemTable.INTERNAL_OPTIONS, MOCK_PROPERTY).singletonString();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
index 342ed68..c1e63a9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
@@ -56,7 +56,7 @@ public class TextRecordReaderTest extends PopUnitTestBase {
           count += b.getHeader().getRowCount();
         }
         loader.load(b.getHeader().getDef(), b.getData());
-        VectorUtil.showVectorAccessibleContent(loader);
+        VectorUtil.logVectorAccessibleContent(loader);
         loader.clear();
         b.release();
       }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
index 6efcabc..1f5ee9f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
@@ -94,7 +94,7 @@ public class TestResourceLeak extends DrillTest {
   public void tpch01() throws Exception {
     final String query = getFile("memory/tpch01_memory_leak.sql");
     try {
-      QueryTestUtil.test(client, "alter session set `planner.slice_target` = 10; " + query);
+      QueryTestUtil.testRunAndLog(client, "alter session set `planner.slice_target` = 10; " + query);
     } catch (UserRemoteException e) {
       if (e.getMessage().contains("Allocator closed with outstanding buffers allocated")) {
         return;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
index ef67d58..db62bf0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
@@ -347,7 +347,7 @@ public class BaseTestQuery extends ExecTest {
   }
 
   public static int testRunAndPrint(final QueryType type, final String query) throws Exception {
-    return QueryTestUtil.testRunAndPrint(client, type, query);
+    return QueryTestUtil.testRunAndLog(client, type, query);
   }
 
   protected static void testWithListener(QueryType type, String query, UserResultsListener resultListener) {
@@ -389,11 +389,11 @@ public class BaseTestQuery extends ExecTest {
   }
 
   public static void test(String query, Object... args) throws Exception {
-    QueryTestUtil.test(client, String.format(query, args));
+    QueryTestUtil.testRunAndLog(client, String.format(query, args));
   }
 
   public static void test(final String query) throws Exception {
-    QueryTestUtil.test(client, query);
+    QueryTestUtil.testRunAndLog(client, query);
   }
 
   protected static int testPhysical(String query) throws Exception{
@@ -513,7 +513,7 @@ public class BaseTestQuery extends ExecTest {
     this.columnWidths = columnWidths;
   }
 
-  protected int printResult(List<QueryDataBatch> results) throws SchemaChangeException {
+  protected int logResult(List<QueryDataBatch> results) throws SchemaChangeException {
     int rowCount = 0;
     final RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
     for(final QueryDataBatch result : results) {
@@ -521,13 +521,18 @@ public class BaseTestQuery extends ExecTest {
       loader.load(result.getHeader().getDef(), result.getData());
       // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
       // SchemaChangeException, so check/clean throw clause above.
-      VectorUtil.showVectorAccessibleContent(loader, columnWidths);
+      VectorUtil.logVectorAccessibleContent(loader, columnWidths);
       loader.clear();
       result.release();
     }
     return rowCount;
   }
 
+  protected int printResult(final List<QueryDataBatch> results) throws SchemaChangeException {
+    int result = PrintingUtils.printAndThrow(() -> logResult(results));
+    return result;
+  }
+
   protected static String getResultString(List<QueryDataBatch> results, String delimiter)
       throws SchemaChangeException {
     final StringBuilder formattedResults = new StringBuilder();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java b/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
index 6d68757..a47e54d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
@@ -39,6 +39,8 @@ import com.google.common.collect.Queues;
 
 public class BufferingQueryEventListener implements UserResultsListener
 {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BufferingQueryEventListener.class);
+
   public static class QueryEvent
   {
     public enum Type { QUERY_ID, BATCH, EOF, ERROR }
@@ -96,8 +98,7 @@ public class BufferingQueryEventListener implements UserResultsListener
     try {
       queue.put(event);
     } catch (InterruptedException e) {
-      // What to do, what to do...
-      e.printStackTrace();
+      logger.error("Exception:", e);
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
index a13789f..b43e9d7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -180,14 +180,10 @@ public class ClientFixture implements AutoCloseable {
   }
 
   /**
-   * Run zero or more queries and optionally print the output in TSV format.
-   * Similar to {@link QueryTestUtil#test}. Output is printed
-   * only if the tests are running as verbose.
-   *
-   * @return the number of rows returned
+   * Run zero or more queries and output the results in TSV format.
    */
-
-  public void runQueries(final String queryString) throws Exception{
+  private void runQueriesAndOutput(final String queryString,
+                                   final boolean print) throws Exception {
     final String query = QueryTestUtil.normalizeQuery(queryString);
     String[] queries = query.split(";");
     for (String q : queries) {
@@ -195,11 +191,30 @@ public class ClientFixture implements AutoCloseable {
       if (trimmedQuery.isEmpty()) {
         continue;
       }
-      queryBuilder().sql(trimmedQuery).print();
+
+      if (print) {
+        queryBuilder().sql(trimmedQuery).print();
+      } else {
+        queryBuilder().sql(trimmedQuery).log();
+      }
     }
   }
 
   /**
+   * Run zero or more queries and log the output in TSV format.
+   */
+  public void runQueriesAndLog(final String queryString) throws Exception {
+    runQueriesAndOutput(queryString, false);
+  }
+
+  /**
+   * Run zero or more queries and print the output in TSV format.
+   */
+  public void runQueriesAndPrint(final String queryString) throws Exception {
+    runQueriesAndOutput(queryString, true);
+  }
+
+  /**
    * Plan a query without execution.
    * @throws ExecutionException
    * @throws InterruptedException
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 77df009..b393db0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -91,7 +91,6 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
 
       put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, DFS_TMP_SCHEMA);
       put(ExecConstants.HTTP_ENABLE, false);
-      put(QueryTestUtil.TEST_QUERY_PRINTING_SILENT, true);
       put("drill.catastrophic_to_standard_out", true);
 
       // Verbose errors.
@@ -570,7 +569,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
 
     @Override
     public void test(String query) throws Exception {
-      client.runQueries(query);
+      client.runQueriesAndLog(query);
     }
 
     @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
index 0a770a0..57ba711 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
@@ -20,7 +20,6 @@ package org.apache.drill.test;
 import java.io.IOException;
 
 import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.test.rowSet.RowSet;
 import org.junit.AfterClass;
 import org.junit.ClassRule;
 
@@ -110,44 +109,19 @@ public class ClusterTest extends DrillTest {
     return ClusterFixture.getResource(resource);
   }
 
-  public void test(String sqlQuery) throws Exception {
-    client.runQueries(sqlQuery);
+  public void runAndLog(String sqlQuery) throws Exception {
+    client.runQueriesAndLog(sqlQuery);
   }
 
-  public static void test(String query, Object... args) throws Exception {
+  public void runAndPrint(String sqlQuery) throws Exception {
+    client.runQueriesAndPrint(sqlQuery);
+  }
+
+  public static void run(String query, Object... args) throws Exception {
     client.queryBuilder().sql(query, args).run( );
   }
 
   public QueryBuilder queryBuilder( ) {
     return client.queryBuilder();
   }
-
-  /**
-   * Handy development-time tool to run a query and print the results. Use this
-   * when first developing tests. Then, encode the expected results using
-   * the appropriate tool and verify them rather than just printing them to
-   * create the final test.
-   *
-   * @param sql the query to run
-   */
-
-  protected void runAndPrint(String sql) {
-    QueryResultSet results = client.queryBuilder().sql(sql).resultSet();
-    try {
-      for (;;) {
-        RowSet rowSet = results.next();
-        if (rowSet == null) {
-          break;
-        }
-        if (rowSet.rowCount() > 0) {
-          rowSet.print();
-        }
-        rowSet.clear();
-      }
-    } catch (Exception e) {
-      throw new IllegalStateException(e);
-    } finally {
-      results.close();
-    }
-  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
index 6e3893e..77ee6e9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ExampleTest.java
@@ -90,7 +90,7 @@ public class ExampleTest {
   public void firstTest() throws Exception {
     try (ClusterFixture cluster = ClusterFixture.standardCluster(dirTestWatcher);
          ClientFixture client = cluster.clientFixture()) {
-      client.queryBuilder().sql("SELECT * FROM `cp`.`employee.json` LIMIT 10").printCsv();
+      client.queryBuilder().sql("SELECT * FROM `cp`.`employee.json` LIMIT 10").logCsv();
     }
   }
 
@@ -170,7 +170,7 @@ public class ExampleTest {
     try (ClusterFixture cluster = ClusterFixture.standardCluster(dirTestWatcher);
          ClientFixture client = cluster.clientFixture()) {
       String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_5`";
-      client.queryBuilder().sql(sql).printCsv();
+      client.queryBuilder().sql(sql).logCsv();
     }
   }
 
@@ -268,7 +268,7 @@ public class ExampleTest {
     try (ClusterFixture cluster = ClusterFixture.standardCluster(dirTestWatcher);
          ClientFixture client = cluster.clientFixture()) {
       cluster.defineWorkspace("dfs", "resources", TestTools.TEST_RESOURCES_ABS.toFile().getAbsolutePath(), "tsv");
-      client.queryBuilder().sql("SELECT * from dfs.resources.`testframework/small_test_data.tsv`").printCsv();
+      client.queryBuilder().sql("SELECT * from dfs.resources.`testframework/small_test_data.tsv`").logCsv();
     }
   }
 
@@ -280,7 +280,7 @@ public class ExampleTest {
     try (ClusterFixture cluster = ClusterFixture.standardCluster(dirTestWatcher);
          ClientFixture client = cluster.clientFixture()) {
       cluster.defineWorkspace("dfs", "sampledata", TestTools.SAMPLE_DATA.toFile().getAbsolutePath(), "parquet");
-      client.queryBuilder().sql("SELECT * from dfs.sampledata.`nation.parquet`").printCsv();
+      client.queryBuilder().sql("SELECT * from dfs.sampledata.`nation.parquet`").logCsv();
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingResultsListener.java b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingResultsListener.java
new file mode 100644
index 0000000..f5cd995
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingResultsListener.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.client.LoggingResultsListener;
+import org.apache.drill.exec.client.QuerySubmitter;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+
+public class PrintingResultsListener extends LoggingResultsListener {
+  public PrintingResultsListener(DrillConfig config, QuerySubmitter.Format format, int columnWidth) {
+    super(config, format, columnWidth);
+  }
+
+  @Override
+  public void submissionFailed(UserException ex) {
+    PrintingUtils.print(() -> {
+      super.submissionFailed(ex);
+      return null;
+    });
+  }
+
+  @Override
+  public void queryCompleted(UserBitShared.QueryResult.QueryState state) {
+    PrintingUtils.print(() -> {
+      super.queryCompleted(state);
+      return null;
+    });
+  }
+
+  @Override
+  public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+    PrintingUtils.print(() -> {
+      super.dataArrived(result, throttle);
+      return null;
+    });
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
new file mode 100644
index 0000000..1709bdf
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import ch.qos.logback.classic.Level;
+import org.apache.drill.exec.client.LoggingResultsListener;
+import org.apache.drill.exec.util.CheckedSupplier;
+import org.apache.drill.exec.util.VectorUtil;
+
+import java.util.function.Supplier;
+
+/**
+ * <p>
+ *   This class contains utility methods to run lambda functions with the necessary {@link org.apache.drill.test.LogFixture}
+ *   boilerplate to print results to stdout for debugging purposes.
+ * </p>
+ *
+ * <p>
+ *   If you need to enable printing for more classes, simply add them to the {@link org.apache.drill.test.LogFixture}
+ *   constructed in {@link #printAndThrow(CheckedSupplier)}.
+ * </p>
+ */
+public final class PrintingUtils {
+
+  /**
+   * Enables printing to stdout for lambda functions that do not throw exceptions.
+   * @param supplier Lambda function to execute.
+   * @param <T> The return type of the lambda function.
+   * @return Data produced by the lambda function.
+   */
+  public static <T> T print(final Supplier<T> supplier) {
+    return printAndThrow(new CheckedSupplier<T, RuntimeException>() {
+      @Override
+      public T get() throws RuntimeException {
+        return supplier.get();
+      }
+    });
+  }
+
+  /**
+   * Enables printing to stdout for lambda functions that throw an exception.
+   * @param supplier Lambda function to execute.
+   * @param <T> Return type of the lambda function.
+   * @param <E> Type of exception thrown.
+   * @return Data produced by the lambda function.
+   * @throws E An exception.
+   */
+  public static <T, E extends Exception> T printAndThrow(CheckedSupplier<T, E> supplier) throws E {
+    try(LogFixture logFixture = new LogFixture.LogFixtureBuilder()
+      .rootLogger(Level.OFF)
+      // For some reason rootLogger(Level.OFF) is not sufficient.
+      .logger("org.apache.drill", Level.OFF) // Disable logging for Drill class we don't want
+      .logger(VectorUtil.class, Level.INFO)
+      .logger(LoggingResultsListener.class, Level.INFO)
+      .toConsole() // This redirects output to stdout
+      .build()) {
+      return supplier.get();
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index ff0e166..834e47b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -29,10 +29,10 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.client.PrintingResultsListener;
+import org.apache.drill.exec.client.LoggingResultsListener;
 import org.apache.drill.exec.client.QuerySubmitter.Format;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -496,6 +496,31 @@ public class QueryBuilder {
     return listener;
   }
 
+  public long logCsv() {
+    return log(Format.CSV);
+  }
+
+  public long log(Format format) {
+    return log(format,20);
+  }
+
+  public long log(Format format, int colWidth) {
+    return runAndWait(new LoggingResultsListener(client.cluster().config(), format, colWidth));
+  }
+
+  /**
+   * <p>
+   *   Run a query and logs the output in TSV format.
+   *   Similar to {@link QueryTestUtil#testRunAndLog} with one query.
+   * </p>
+   *
+   * @return The number of rows returned.
+   * @throws Exception If anything goes wrong with query execution.
+   */
+  public long log() throws Exception {
+    return log(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
+  }
+
   public long printCsv() {
     return print(Format.CSV);
   }
@@ -509,6 +534,19 @@ public class QueryBuilder {
   }
 
   /**
+   * <p>
+   *   Runs a query and prints the output to stdout in TSV format.
+   *   Similar to {@link QueryTestUtil#testRunAndLog} with one query.
+   * </p>
+   *
+   * @return The number of rows returned.
+   * @throws Exception If anything goes wrong with query execution.
+   */
+  public long print() throws Exception {
+    return print(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
+  }
+
+  /**
    * Run the query asynchronously, returning a future to be used
    * to check for query completion, wait for completion, and obtain
    * the result summary.
@@ -520,33 +558,14 @@ public class QueryBuilder {
     return future;
   }
 
-  /**
-   * Run a query and optionally print the output in TSV format.
-   * Similar to {@link QueryTestUtil#test} with one query. Output is printed
-   * only if the tests are running as verbose.
-   *
-   * @return the number of rows returned
-   * @throws Exception if anything goes wrong with query execution
-   */
-
-  public long print() throws Exception {
-    DrillConfig config = client.cluster().config( );
-
-    boolean verbose = !config.getBoolean(QueryTestUtil.TEST_QUERY_PRINTING_SILENT);
-
-    if (verbose) {
-      return print(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
-    } else {
-      return run().recordCount();
-    }
-  }
-
   public long runAndWait(UserResultsListener listener) {
     AwaitableUserResultsListener resultListener =
         new AwaitableUserResultsListener(listener);
     withListener(resultListener);
     try {
       return resultListener.await();
+    } catch (UserRemoteException e) {
+      throw e;
     } catch (Exception e) {
       throw new IllegalStateException(e);
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java
index 96de2de..1596562 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryTestUtil.java
@@ -22,11 +22,10 @@ import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.drill.test.BaseTestQuery.SilentListener;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.client.PrintingResultsListener;
+import org.apache.drill.exec.client.LoggingResultsListener;
 import org.apache.drill.exec.client.QuerySubmitter.Format;
 import org.apache.drill.exec.compile.ClassTransformer;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -47,9 +46,6 @@ import org.apache.drill.exec.util.VectorUtil;
  * Utilities useful for tests that issue SQL queries.
  */
 public class QueryTestUtil {
-
-  public static final String TEST_QUERY_PRINTING_SILENT = "drill.test.query.printing.silent";
-
   /**
    * Constructor. All methods are static.
    */
@@ -100,36 +96,41 @@ public class QueryTestUtil {
   }
 
   /**
-   * Execute a SQL query, and print the results.
+   * Execute a SQL query, and output the results.
    *
    * @param drillClient drill client to use
    * @param type type of the query
    * @param queryString query string
+   * @param print True to output results to stdout. False to log results.
+   *
    * @return number of rows returned
-   * @throws Exception
+   * @throws Exception An error while running the query.
    */
-  public static int testRunAndPrint(
-      final DrillClient drillClient, final QueryType type, final String queryString) throws Exception {
+  private static int testRunAndOutput(final DrillClient drillClient,
+                                      final QueryType type,
+                                      final String queryString,
+                                      final boolean print) throws Exception {
     final String query = normalizeQuery(queryString);
     DrillConfig config = drillClient.getConfig();
     AwaitableUserResultsListener resultListener =
-        new AwaitableUserResultsListener(
-            config.getBoolean(TEST_QUERY_PRINTING_SILENT) ?
-                new SilentListener() :
-                new PrintingResultsListener(config, Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH)
-        );
+      new AwaitableUserResultsListener(print ?
+      new PrintingResultsListener(config, Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH):
+      new LoggingResultsListener(config, Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH));
     drillClient.runQuery(type, query, resultListener);
     return resultListener.await();
   }
 
   /**
-   * Execute one or more queries separated by semicolons, and print the results.
+   * Execute one or more queries separated by semicolons, and output the results.
    *
    * @param drillClient drill client to use
    * @param queryString the query string
-   * @throws Exception
+   * @param print True to output results to stdout. False to log results.
+   * @throws Exception An error while running the query.
    */
-  public static void test(final DrillClient drillClient, final String queryString) throws Exception{
+  public static void testRunAndOutput(final DrillClient drillClient,
+                                      final String queryString,
+                                      final boolean print) throws Exception {
     final String query = normalizeQuery(queryString);
     String[] queries = query.split(";");
     for (String q : queries) {
@@ -137,11 +138,79 @@ public class QueryTestUtil {
       if (trimmedQuery.isEmpty()) {
         continue;
       }
-      testRunAndPrint(drillClient, QueryType.SQL, trimmedQuery);
+      testRunAndOutput(drillClient, QueryType.SQL, trimmedQuery, print);
     }
   }
 
   /**
+   * Execute a SQL query, and log the results.
+   *
+   * @param drillClient drill client to use
+   * @param type type of the query
+   * @param queryString query string
+   * @return number of rows returned
+   * @throws Exception An error while running the query.
+   */
+  public static int testRunAndLog(final DrillClient drillClient,
+                                  final QueryType type,
+                                  final String queryString) throws Exception {
+    return testRunAndOutput(drillClient, type, queryString, false);
+  }
+
+  /**
+   * Execute one or more queries separated by semicolons, and log the results.
+   *
+   * @param drillClient drill client to use
+   * @param queryString the query string
+   * @throws Exception An error while running the queries.
+   */
+  public static void testRunAndLog(final DrillClient drillClient,
+                                   final String queryString) throws Exception {
+    testRunAndOutput(drillClient, queryString, false);
+  }
+
+  /**
+   * Execute one or more queries separated by semicolons, and log the results, with the option to
+   * add formatted arguments to the query string.
+   *
+   * @param drillClient drill client to use
+   * @param query the query string; may contain formatting specifications to be used by
+   *   {@link String#format(String, Object...)}.
+   * @param args optional args to use in the formatting call for the query string
+   * @throws Exception An error while running the query.
+   */
+  public static void testRunAndLog(final DrillClient drillClient, final String query, Object... args) throws Exception {
+    testRunAndLog(drillClient, String.format(query, args));
+  }
+
+  /**
+   * Execute a SQL query, and print the results.
+   *
+   * @param drillClient drill client to use
+   * @param type type of the query
+   * @param queryString query string
+   * @return number of rows returned
+   * @throws Exception An error while running the query.
+   */
+  public static int testRunAndPrint(final DrillClient drillClient,
+                                    final QueryType type,
+                                    final String queryString) throws Exception {
+    return testRunAndOutput(drillClient, type, queryString, true);
+  }
+
+  /**
+   * Execute one or more queries separated by semicolons, and print the results.
+   *
+   * @param drillClient drill client to use
+   * @param queryString the query string
+   * @throws Exception An error while running the queries.
+   */
+  public static void testRunAndPrint(final DrillClient drillClient,
+                                     final String queryString) throws Exception{
+    testRunAndOutput(drillClient, queryString, true);
+  }
+
+  /**
    * Execute one or more queries separated by semicolons, and print the results, with the option to
    * add formatted arguments to the query string.
    *
@@ -149,10 +218,10 @@ public class QueryTestUtil {
    * @param query the query string; may contain formatting specifications to be used by
    *   {@link String#format(String, Object...)}.
    * @param args optional args to use in the formatting call for the query string
-   * @throws Exception
+   * @throws Exception An error while running the query.
    */
-  public static void test(final DrillClient drillClient, final String query, Object... args) throws Exception {
-    test(drillClient, String.format(query, args));
+  public static void testRunAndPrint(final DrillClient drillClient, final String query, Object... args) throws Exception {
+    testRunAndPrint(drillClient, String.format(query, args));
   }
 
   /**
diff --git a/pom.xml b/pom.xml
index e195434..cbd27e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -721,7 +721,6 @@
               -Ddrill.exec.memory.enable_unsafe_bounds_check=true
               -Ddrill.exec.sys.store.provider.local.write=false
               -Dorg.apache.drill.exec.server.Drillbit.system_options="org.apache.drill.exec.compile.ClassTransformer.scalar_replacement=on"
-              -Ddrill.test.query.printing.silent=true
               -Ddrill.catastrophic_to_standard_out=true
               -XX:MaxDirectMemorySize=${directMemoryMb}M
               -Djava.net.preferIPv4Stack=true


[drill] 03/03: DRILL-6104: Add Log/Regex Format Plugin

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit b1aca337f4e07ad34ad5662872b237bd7257e468
Author: Charles S. Givre <cg...@gmail.com>
AuthorDate: Wed Jul 18 10:44:42 2018 -0400

    DRILL-6104: Add Log/Regex Format Plugin
    
    closes #1114
---
 .../drill/exec/store/log/LogFormatConfig.java      | 119 ++++
 .../drill/exec/store/log/LogFormatField.java       |  86 +++
 .../drill/exec/store/log/LogFormatPlugin.java      |  84 +++
 .../drill/exec/store/log/LogRecordReader.java      | 764 +++++++++++++++++++++
 .../java/org/apache/drill/exec/store/log/README.md |  86 +++
 .../store/dfs/TestFormatPluginOptionExtractor.java |  14 +-
 .../apache/drill/exec/store/log/TestLogReader.java | 366 ++++++++++
 .../src/test/resources/regex/baddates.log2         |   5 +
 .../src/test/resources/regex/mysql.sqllog          |   6 +
 .../src/test/resources/regex/mysql.sqllog2         |   6 +
 .../java-exec/src/test/resources/regex/simple.log1 |   5 +
 .../java-exec/src/test/resources/regex/simple.log2 |   5 +
 pom.xml                                            |   5 +
 .../org/apache/drill/exec/proto/UserBitShared.java |  10 +
 14 files changed, 1555 insertions(+), 6 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatConfig.java
new file mode 100644
index 0000000..c3cf97e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatConfig.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.log;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Objects;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("logRegex")
+public class LogFormatConfig implements FormatPluginConfig {
+
+  private String regex;
+  private String extension;
+  private int maxErrors = 10;
+  private List<LogFormatField> schema;
+
+  public String getRegex() {
+    return regex;
+  }
+
+  public String getExtension() {
+    return extension;
+  }
+
+  public int getMaxErrors() {
+    return maxErrors;
+  }
+
+  public List<LogFormatField> getSchema() {
+    return schema;
+  }
+
+  //Setters
+  public void setExtension(String ext) {
+    this.extension = ext;
+  }
+
+  public void setMaxErrors(int errors) {
+    this.maxErrors = errors;
+  }
+
+  public void setRegex(String regex) {
+    this.regex = regex;
+  }
+
+  public void setSchema() {
+    this.schema = new ArrayList<LogFormatField>();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    LogFormatConfig other = (LogFormatConfig) obj;
+    return Objects.equal(regex, other.regex) &&
+        Objects.equal(maxErrors, other.maxErrors) &&
+        Objects.equal(schema, other.schema) &&
+        Objects.equal(extension, other.extension);
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(new Object[]{regex, maxErrors, schema, extension});
+  }
+
+  @JsonIgnore
+  public List<String> getFieldNames() {
+    List<String> result = new ArrayList<String>();
+    if (this.schema == null) {
+      return result;
+    }
+
+    for (LogFormatField field : this.schema) {
+      result.add(field.getFieldName());
+    }
+    return result;
+  }
+
+  @JsonIgnore
+  public String getDataType(int fieldIndex) {
+    LogFormatField f = this.schema.get(fieldIndex);
+    return f.getFieldType().toUpperCase();
+  }
+
+  @JsonIgnore
+  public LogFormatField getField(int fieldIndex) {
+    return this.schema.get(fieldIndex);
+  }
+
+  @JsonIgnore
+  public String getDateFormat(int patternIndex) {
+    LogFormatField f = this.schema.get(patternIndex);
+    return f.getFormat();
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatField.java
new file mode 100644
index 0000000..64a6db7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatField.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.log;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("regexReaderFieldDescription")
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class LogFormatField {
+
+  /*
+   * The three configuration options for a field are:
+   * 1.  The field name
+   * 2.  The data type (fieldType).  Field type defaults to VARCHAR if it is not specified
+   * 3.  The format string which is used for date/time fields.  This field is ignored if used with a non
+   * date/time field.
+   * */
+
+  private String fieldName = "";
+  private String fieldType = "VARCHAR";
+  private String format;
+
+  //These will be used in the future for field validation and masking
+  //public String validator;
+  //public double minValue;
+  //public double maxValue;
+
+
+  public LogFormatField() {
+  }
+
+  //These constructors are used for unit testing
+  public LogFormatField(String fieldName) {
+    this(fieldName, null, null);
+  }
+
+  public LogFormatField(String fieldName, String fieldType) {
+    this(fieldName, fieldType, null);
+  }
+
+  public LogFormatField(String fieldName, String fieldType, String format) {
+    this.fieldName = fieldName;
+    this.fieldType = fieldType;
+    this.format = format;
+  }
+
+  public String getFieldName() {
+    return fieldName;
+  }
+
+  public String getFieldType() {
+    return fieldType;
+  }
+
+  public String getFormat() {
+    return format;
+  }
+
+
+  /*
+  public String getValidator() { return validator; }
+
+  public double getMinValue() { return minValue; }
+
+  public double getMaxValue() {
+    return maxValue;
+  }
+  */
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
new file mode 100644
index 0000000..d2e6772
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.log;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+
+public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
+
+  public static final String DEFAULT_NAME = "logRegex";
+  private final LogFormatConfig formatConfig;
+
+  public LogFormatPlugin(String name, DrillbitContext context,
+                         Configuration fsConf, StoragePluginConfig storageConfig,
+                         LogFormatConfig formatConfig) {
+    super(name, context, fsConf, storageConfig, formatConfig,
+        true,  // readable
+        false, // writable
+        true, // blockSplittable
+        true,  // compressible
+        Lists.newArrayList(formatConfig.getExtension()),
+        DEFAULT_NAME);
+    this.formatConfig = formatConfig;
+  }
+
+  @Override
+  public RecordReader getRecordReader(FragmentContext context,
+                                      DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns,
+                                      String userName) throws ExecutionSetupException {
+    return new LogRecordReader(context, dfs, fileWork,
+        columns, userName, formatConfig);
+  }
+
+  @Override
+  public boolean supportsPushDown() {
+    return true;
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(FragmentContext context,
+                                      EasyWriter writer) throws UnsupportedOperationException {
+    throw new UnsupportedOperationException("unimplemented");
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+    return UserBitShared.CoreOperatorType.REGEX_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+    throw new UnsupportedOperationException("unimplemented");
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
new file mode 100644
index 0000000..58f07f2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.log;
+
+import com.google.common.base.Charsets;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableSmallIntVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableTimeVector;
+
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+
+  private abstract static class ColumnDefn {
+    private final String name;
+    private final int index;
+    private final String format;
+
+    public ColumnDefn(String name, int index) {
+      this(name, index, null);
+    }
+
+    public ColumnDefn(String name, int index, String format) {
+      this.name = name;
+      this.index = index;
+      this.format = format;
+    }
+
+    public abstract void define(OutputMutator outputMutator) throws SchemaChangeException;
+
+    public abstract void load(int rowIndex, String value);
+
+    public String getName() { return this.name; }
+
+    public int getIndex() { return this.index; }
+
+    public String getFormat() { return this.format;}
+
+    @Override
+    //For testing
+    public String toString() {
+      return "Name: " + name + ", Index: " + index + ", Format: " + format;
+    }
+  }
+
+  private static class VarCharDefn extends ColumnDefn {
+
+    private NullableVarCharVector.Mutator mutator;
+
+    public VarCharDefn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.VARCHAR));
+      mutator = outputMutator.addField(field, NullableVarCharVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      mutator.set(rowIndex, value.getBytes());
+    }
+  }
+
+  private static class BigIntDefn extends ColumnDefn {
+
+    private NullableBigIntVector.Mutator mutator;
+
+    public BigIntDefn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.BIGINT));
+      mutator = outputMutator.addField(field, NullableBigIntVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        mutator.set(rowIndex, Long.parseLong(value));
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an INT field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class SmallIntDefn extends ColumnDefn {
+
+    private NullableSmallIntVector.Mutator mutator;
+
+    public SmallIntDefn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.SMALLINT));
+      mutator = outputMutator.addField(field, NullableSmallIntVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        mutator.set(rowIndex, Short.parseShort(value));
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an INT field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class IntDefn extends ColumnDefn {
+
+    private NullableIntVector.Mutator mutator;
+
+    public IntDefn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.INT));
+      mutator = outputMutator.addField(field, NullableIntVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        mutator.set(rowIndex, Integer.parseInt(value));
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an INT field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class Float4Defn extends ColumnDefn {
+
+    private NullableFloat4Vector.Mutator mutator;
+
+    public Float4Defn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.FLOAT4));
+      mutator = outputMutator.addField(field, NullableFloat4Vector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        mutator.set(rowIndex, Float.parseFloat(value));
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an FLOAT field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class DoubleDefn extends ColumnDefn {
+
+    private NullableFloat8Vector.Mutator mutator;
+
+    public DoubleDefn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.FLOAT8));
+      mutator = outputMutator.addField(field, NullableFloat8Vector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        mutator.set(rowIndex, Double.parseDouble(value));
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an FLOAT field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class DateDefn extends ColumnDefn {
+
+    private NullableDateVector.Mutator mutator;
+    private SimpleDateFormat df;
+
+    public DateDefn(String name, int index, String dateFormat) {
+      super(name, index, dateFormat);
+      df = getValidDateObject(dateFormat);
+    }
+
+    private SimpleDateFormat getValidDateObject(String d) {
+      SimpleDateFormat tempDateFormat;
+      if (d != null && !d.isEmpty()) {
+        tempDateFormat = new SimpleDateFormat(d);
+      } else {
+        throw UserException.parseError()
+            .message("Invalid date format.  The date formatting string was empty.")
+            .build(logger);
+      }
+      return tempDateFormat;
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.DATE));
+      mutator = outputMutator.addField(field, NullableDateVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        Date d = df.parse(value);
+        long milliseconds = d.getTime();
+        mutator.set(rowIndex, milliseconds);
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an DATE field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      } catch (ParseException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Date Format String does not match field value.")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Format String", getFormat())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class TimeDefn extends ColumnDefn {
+
+    private NullableTimeVector.Mutator mutator;
+    private SimpleDateFormat df;
+
+    public TimeDefn(String name, int index, String dateFormat) {
+      super(name, index, dateFormat);
+      df = getValidDateObject(dateFormat);
+    }
+
+    private SimpleDateFormat getValidDateObject(String d) {
+      SimpleDateFormat tempDateFormat;
+      if (d != null && !d.isEmpty()) {
+        tempDateFormat = new SimpleDateFormat(d);
+      } else {
+        throw UserException.parseError()
+            .message("Invalid date format.  The date formatting string was empty.")
+            .build(logger);
+      }
+      return tempDateFormat;
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.TIME));
+      mutator = outputMutator.addField(field, NullableTimeVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        Date d = df.parse(value);
+        int milliseconds = (int) d.getTime();
+        mutator.set(rowIndex, milliseconds);
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse an Time field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      } catch (ParseException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Date Format String does not match field value.")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Format String", getFormat())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static class TimeStampDefn extends ColumnDefn {
+
+    private NullableTimeStampVector.Mutator mutator;
+    private SimpleDateFormat df;
+
+    public TimeStampDefn(String name, int index, String dateFormat) {
+      super(name, index, dateFormat);
+      df = getValidDateObject(dateFormat);
+    }
+
+    private SimpleDateFormat getValidDateObject(String d) {
+      SimpleDateFormat tempDateFormat;
+      if (d != null && !d.isEmpty()) {
+        tempDateFormat = new SimpleDateFormat(d);
+      } else {
+        throw UserException.parseError()
+            .message("Invalid date format.  The date formatting string was empty.")
+            .build(logger);
+      }
+      return tempDateFormat;
+    }
+
+    @Override
+    public void define(OutputMutator outputMutator) throws SchemaChangeException {
+      MaterializedField field = MaterializedField.create(getName(),
+          Types.optional(MinorType.TIMESTAMP));
+      mutator = outputMutator.addField(field, NullableTimeStampVector.class).getMutator();
+    }
+
+    @Override
+    public void load(int rowIndex, String value) {
+      try {
+        Date d = df.parse(value);
+        long milliseconds = d.getTime();
+        mutator.set(rowIndex, milliseconds);
+      } catch (NumberFormatException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Failed to parse a TIMESTAMP field")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Value", value)
+            .build(logger);
+      } catch (ParseException e) {
+        throw UserException
+            .dataReadError(e)
+            .addContext("Date Format String does not match field value.")
+            .addContext("Column", getName())
+            .addContext("Position", getIndex())
+            .addContext("Format String", getFormat())
+            .addContext("Value", value)
+            .build(logger);
+      }
+    }
+  }
+
+  private static final int BATCH_SIZE = BaseValueVector.INITIAL_VALUE_ALLOCATION;
+
+  private final DrillFileSystem dfs;
+  private final FileWork fileWork;
+  private final String userName;
+  private final LogFormatConfig formatConfig;
+  private ColumnDefn columns[];
+  private Pattern pattern;
+  private BufferedReader reader;
+  private int rowIndex;
+  private int capturingGroups;
+  private OutputMutator outputMutator;
+  private int unmatchedColumnIndex;
+  private int unmatchedRowIndex;
+  private boolean unmatchedRows;
+  private int maxErrors;
+
+
+  private int errorCount;
+
+
+  public LogRecordReader(FragmentContext context, DrillFileSystem dfs,
+                         FileWork fileWork, List<SchemaPath> columns, String userName,
+                         LogFormatConfig formatConfig) {
+    this.dfs = dfs;
+    this.fileWork = fileWork;
+    this.userName = userName;
+    this.formatConfig = formatConfig;
+    this.unmatchedColumnIndex = -1;
+    this.unmatchedRowIndex = 0;
+    this.unmatchedRows = false;
+    this.maxErrors = formatConfig.getMaxErrors();
+
+    // Ask the superclass to parse the projection list.
+    setColumns(columns);
+
+    if (maxErrors < 0) {
+      throw UserException
+          .validationError()
+          .message("Max Errors must be a positive integer greater than zero.")
+          .build(logger);
+    }
+
+  }
+
+  @Override
+  public void setup(final OperatorContext context, final OutputMutator output) {
+    this.outputMutator = output;
+
+    setupPattern();
+    openFile();
+    setupProjection();
+    defineVectors();
+  }
+
+  private void setupPattern() {
+    try {
+      this.pattern = Pattern.compile(this.formatConfig.getRegex());
+      Matcher m = pattern.matcher("test");
+      capturingGroups = m.groupCount();
+    } catch (PatternSyntaxException e) {
+      throw UserException
+          .validationError(e)
+          .message("Failed to parse regex: \"%s\"", formatConfig.getRegex())
+          .build(logger);
+    }
+  }
+
+  private void setupProjection() {
+    if (isSkipQuery()) {
+      projectNone();
+    } else if (isStarQuery()) {
+      projectAll();
+    } else {
+      projectSubset();
+    }
+  }
+
+  private void projectNone() {
+    columns = new ColumnDefn[]{new VarCharDefn("dummy", -1)};
+  }
+
+  private void openFile() {
+    InputStream in;
+    try {
+      in = dfs.open(new Path(fileWork.getPath()));
+    } catch (Exception e) {
+      throw UserException
+          .dataReadError(e)
+          .message("Failed to open open input file: %s", fileWork.getPath())
+          .addContext("User name", userName)
+          .build(logger);
+    }
+    reader = new BufferedReader(new InputStreamReader(in, Charsets.UTF_8));
+  }
+
+  private void projectAll() {
+    List<String> fields = formatConfig.getFieldNames();
+    for (int i = fields.size(); i < capturingGroups; i++) {
+      fields.add("field_" + i);
+    }
+    columns = new ColumnDefn[capturingGroups];
+
+    for (int i = 0; i < capturingGroups; i++) {
+      columns[i] = makeColumn(fields.get(i), i);
+    }
+  }
+
+  private void projectSubset() {
+    Collection<SchemaPath> project = this.getColumns();
+    assert !project.isEmpty();
+    columns = new ColumnDefn[project.size()];
+
+    List<String> fields = formatConfig.getFieldNames();
+    int colIndex = 0;
+
+
+    for (SchemaPath column : project) {
+      if (column.getAsNamePart().hasChild()) {
+        throw UserException
+            .validationError()
+            .message("The log format plugin supports only simple columns")
+            .addContext("Projected column", column.toString())
+            .build(logger);
+      }
+
+      String name = column.getAsNamePart().getName();
+
+      //Need this to retrieve unnamed fields
+      Pattern r = Pattern.compile("^field_(\\d+)$");
+      Matcher m = r.matcher(name);
+      int patternIndex = -1;
+
+      if (name.equals("_unmatched_rows")) {
+        //Set boolean flag to true
+        this.unmatchedRows = true;
+        this.unmatchedColumnIndex = colIndex;
+      } else if (m.find()) {
+        //if no fields are defined in the configuration, then all the fields have names of 'field_n'
+        //Therefore n is the column index
+        patternIndex = Integer.parseInt(m.group(1));
+      } else {
+        for (int i = 0; i < fields.size(); i++) {
+          if (fields.get(i).equalsIgnoreCase(name) ||
+              fields.get(i).equals("_raw") ||
+              fields.get(i).equals("_unmatched_rows")
+              ) {
+            patternIndex = i;
+
+            break;
+          }
+        }
+      }
+      columns[colIndex++] = makeColumn(name, patternIndex);
+    }
+
+  }
+
+  private ColumnDefn makeColumn(String name, int patternIndex) {
+    String typeName = null;
+    if (patternIndex <= -1 || formatConfig.getSchema() == null) {
+      // Use VARCHAR for missing columns
+      // (instead of Drill standard of nullable int)
+      typeName = MinorType.VARCHAR.name();
+    } else if (patternIndex < formatConfig.getSchema().size()) {
+      //typeName = formatConfig.getDataType(patternIndex);
+      LogFormatField tempField = formatConfig.getField(patternIndex);
+      typeName = tempField.getFieldType().toUpperCase();
+    }
+    if (typeName == null) {
+      // No type name. VARCHAR is a safe guess
+      typeName = MinorType.VARCHAR.name();
+    }
+    if (name.equals("_raw") || name.equals("_unmatched_rows")) {
+      return new VarCharDefn(name, patternIndex);
+    }
+
+    MinorType type = MinorType.valueOf(typeName);
+    //System.out.println( "Type name: "  + typeName + " Type: " + type);
+    switch (type) {
+      case VARCHAR:
+        return new VarCharDefn(name, patternIndex);
+      case INT:
+        return new IntDefn(name, patternIndex);
+      case SMALLINT:
+        return new SmallIntDefn(name, patternIndex);
+      case BIGINT:
+        return new BigIntDefn(name, patternIndex);
+      case FLOAT4:
+        return new Float4Defn(name, patternIndex);
+      case FLOAT8:
+        return new DoubleDefn(name, patternIndex);
+      case DATE:
+        return new DateDefn(name, patternIndex, formatConfig.getDateFormat(patternIndex));
+      case TIMESTAMP:
+        return new TimeStampDefn(name, patternIndex, formatConfig.getDateFormat(patternIndex));
+      case TIME:
+        return new TimeDefn(name, patternIndex, formatConfig.getDateFormat(patternIndex));
+      default:
+        throw UserException
+            .validationError()
+            .message("Undefined column types")
+            .addContext("Position", patternIndex)
+            .addContext("Field name", name)
+            .addContext("Type", typeName)
+            .build(logger);
+    }
+  }
+
+  private void defineVectors() {
+    for (int i = 0; i < columns.length; i++) {
+      try {
+        columns[i].define(outputMutator);
+      } catch (SchemaChangeException e) {
+        throw UserException
+            .systemError(e)
+            .message("Vector creation failed")
+            .build(logger);
+      }
+    }
+  }
+
+  @Override
+  public int next() {
+    rowIndex = 0;
+    while (nextLine()) {
+    }
+    return rowIndex;
+  }
+
+  private boolean nextLine() {
+    String line;
+    try {
+      line = reader.readLine();
+    } catch (IOException e) {
+      throw UserException
+          .dataReadError(e)
+          .message("Error reading file:")
+          .addContext("File", fileWork.getPath())
+          .build(logger);
+    }
+
+    if (line == null) {
+      return false;
+    }
+    Matcher lineMatcher = pattern.matcher(line);
+    if (lineMatcher.matches()) {
+      loadVectors(lineMatcher);
+      return rowIndex < BATCH_SIZE;
+    }
+
+    errorCount++;
+    if (errorCount < maxErrors) {
+      logger.warn("Unmatached line: {}", line);
+    } else if (errorCount > maxErrors) {
+      throw UserException.parseError()
+          .message("Too many errors.  Max error threshold exceeded.")
+          .addContext("Line", line)
+          .addContext("Line number", rowIndex)
+          .build(logger);
+    }
+    //If the user asked for the unmatched columns display them
+    if (unmatchedRows) {
+      //If the user asked for the unmatched columns AND other columns
+      if (columns.length > 1) {
+        columns[unmatchedColumnIndex].load(rowIndex, line);
+        rowIndex++;
+        return rowIndex < BATCH_SIZE;
+      } else {
+        //If the user ONLY asked for the unmatched columns
+        columns[unmatchedColumnIndex].load(unmatchedRowIndex, line);
+        unmatchedRowIndex++;
+        rowIndex = unmatchedRowIndex;
+        return unmatchedRowIndex < BATCH_SIZE;
+      }
+    }
+
+    return true;
+  }
+
+  private void loadVectors(Matcher m) {
+    String value = null;
+    /*if( unmatchedRows && columns.length == 1 ){
+      return;
+    }*/
+
+    for (int i = 0; i < columns.length; i++) {
+      //Skip the unmatched rows column
+      if (columns[i].name.equals("_unmatched_rows")) {
+        continue;
+      }
+
+      if (columns[i].index >= 0) {
+        //Get the value of the regex group
+        value = m.group(columns[i].index + 1);
+
+        //If the value is not null, assign it to the column
+        if (value != null) {
+          columns[i].load(rowIndex, value);
+        }
+      } else if (columns[i].name.equals("_raw")) {
+        //Special case.  The first is if the query contains the _raw column
+        value = m.group(0);
+        if (value != null) {
+          columns[i].load(rowIndex, value);
+        } else {
+          rowIndex++;
+        }
+      }
+    }
+    rowIndex++;
+  }
+
+  @Override
+  public void close() {
+    if (reader != null) {
+      try {
+        reader.close();
+      } catch (IOException e) {
+        logger.warn("Error when closing file: " + fileWork.getPath(), e);
+      }
+      reader = null;
+    }
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/README.md b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/README.md
new file mode 100644
index 0000000..9110c55
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/README.md
@@ -0,0 +1,86 @@
+# Drill Regex/Logfile Plugin
+Plugin for Apache Drill that allows Drill to read and query arbitrary files where the schema can be defined by a regex.  The original intent was for this to be used for log files, however, it can be used for any structured data.
+
+## Example Use Case:  MySQL Log
+If you wanted to analyze log files such as the MySQL log sample shown below using Drill, it may be possible using various string fucntions, or you could write a UDF specific to this data however, this is time consuming, difficult and not reusable.
+
+```
+070823 21:00:32       1 Connect     root@localhost on test1
+070823 21:00:48       1 Query       show tables
+070823 21:00:56       1 Query       select * from category
+070917 16:29:01      21 Query       select * from location
+070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1
+```
+This plugin will allow you to configure Drill to directly query logfiles of any configuration.
+
+## Configuration Options
+* **`type`**:  This tells Drill which extension to use.  In this case, it must be `logRegex`.  This field is mandatory.
+* **`regex`**:  This is the regular expression which defines how the log file lines will be split.  You must enclose the parts of the regex in grouping parentheses that you wish to extract.  Note that this plugin uses Java regular expressions and requires that shortcuts such as `\d` have an additional slash:  ie `\\d`.  This field is mandatory.
+* **`extension`**:  This option tells Drill which file extensions should be mapped to this configuration.  Note that you can have multiple configurations of this plugin to allow you to query various log files.  This field is mandatory.
+* **`maxErrors`**:  Log files can be inconsistent and messy.  The `maxErrors` variable allows you to set how many errors the reader will ignore before halting execution and throwing an error.  Defaults to 10.
+* **`schema`**:  The `schema` field is where you define the structure of the log file.  This section is optional.  If you do not define a schema, all fields will be assigned a column name of `field_n` where `n` is the index of the field. The undefined fields will be assigned a default data type of `VARCHAR`.
+
+### Defining a Schema
+The schema variable is an JSON array of fields which have at the moment, three possible variables:
+* **`fieldName`**:  This is the name of the field.
+* **`fieldType`**:  Defines the data type.  Defaults to `VARCHAR` if undefined. At the time of writing, the reader supports: `VARCHAR`, `INT`, `SMALLINT`, `BIGINT`, `FLOAT4`, `FLOAT8`, `DATE`, `TIMESTAMP`, `TIME`.
+* **`format`**: Defines the for date/time fields.  This is mandatory if the field is a date/time field.
+
+In the future, it is my hope that the schema section will allow for data masking, validation and other transformations that are commonly used for analysis of log files.
+
+### Example Configuration:
+The configuration below demonstrates how to configure Drill to query the example MySQL log file shown above.
+
+
+```
+"log" : {
+      "type" : "logRegex",
+      "extension" : "log",
+      "regex" : "(\\d{6})\\s(\\d{2}:\\d{2}:\\d{2})\\s+(\\d+)\\s(\\w+)\\s+(.+)",
+      "maxErrors": 10,
+      "schema": [
+        {
+          "fieldName": "eventDate",
+          "fieldType": "DATE",
+          "format": "yyMMdd"
+        },
+        {
+          "fieldName": "eventTime",
+          "fieldType": "TIME",
+          "format": "HH:mm:ss"
+        },
+        {
+          "fieldName": "PID",
+          "fieldType": "INT"
+        },
+        {
+          "fieldName": "action"
+        },
+        {
+          "fieldName": "query"
+        }
+      ]
+   }
+ ```
+
+
+## Example Usage
+
+This format plugin gives you two options for querieng fields.  If you define the fields, you can query them as you would any other data source.  If you do nof define a field in the column `schema` variable, Drill will extract all fields and give them the name `field_n`.  The fields are indexed from `0`.  Therefore if you have a dataset with 5 fields the following query would be valid:
+
+```
+SELECT field_0, field_1, field_2, field_3, field_4
+FROM ..
+```
+
+### Implicit Fields
+In addition to the fields which the user defines, the format plugin has two implicit fields whcih can be useful for debugging your regex.  These fields do not appear in `SELECT *` queries and only will be retrieved when included in a query.
+
+* **`_raw`**:  This field returns the complete lines which matched your regex.
+* **`_unmatched_rows`**:  This field returns rows which **did not** match the regex.  Note: This field ONLY returns the unmatching rows, so if you have a data file of 10 lines, 8 of which match, `SELECT _unmatched_rows` will return 2 rows.  If however, you combine this with another field, such as `_raw`, the `_unmatched_rows` will be `null` when the rows match and have a value when it does not.
+
+
+
+
+
+
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index 8b73b53..f51fe4c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -17,11 +17,7 @@
  */
 package org.apache.drill.exec.store.dfs;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Collection;
-
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.scanner.RunTimeScan;
 import org.apache.drill.common.scanner.persistence.ScanResult;
@@ -29,7 +25,10 @@ import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
 import org.apache.drill.exec.store.image.ImageFormatConfig;
 import org.junit.Test;
 
-import com.fasterxml.jackson.annotation.JsonTypeName;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 
 public class TestFormatPluginOptionExtractor {
@@ -72,6 +71,9 @@ public class TestFormatPluginOptionExtractor {
               "(type: String, fileSystemMetadata: boolean, descriptive: boolean, timeZone: String)", d.presentParams()
           );
           break;
+        case "logRegex":
+          assertEquals(d.typeName, "(type: String, regex: String, extension: String, maxErrors: int, schema: List)", d.presentParams());
+          break;
         default:
           fail("add validation for format plugin type " + d.typeName);
       }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
new file mode 100644
index 0000000..0df615e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.log;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestLogReader extends ClusterTest {
+
+  public static final String DATE_ONLY_PATTERN = "(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d) .*";
+
+  @ClassRule
+  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    // Define a regex format config for testing.
+
+    defineRegexPlugin();
+  }
+
+  private static void defineRegexPlugin() throws ExecutionSetupException {
+
+    // Create an instance of the regex config.
+    // Note: we can't use the ".log" extension; the Drill .gitignore
+    // file ignores such files, so they'll never get committed. Instead,
+    // make up a fake suffix.
+
+    LogFormatConfig sampleConfig = new LogFormatConfig();
+    sampleConfig.setExtension("log1");
+    sampleConfig.setRegex(DATE_ONLY_PATTERN);
+
+    sampleConfig.setSchema();
+    sampleConfig.getSchema().add( new LogFormatField("year","INT"));
+    sampleConfig.getSchema().add( new LogFormatField("month", "INT"));
+    sampleConfig.getSchema().add( new LogFormatField("day", "INT"));
+
+    // Full Drill log parser definition.
+
+    LogFormatConfig logConfig = new LogFormatConfig();
+    logConfig.setExtension("log1");
+    logConfig.setRegex("(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d) " +
+        "(\\d\\d):(\\d\\d):(\\d\\d),\\d+ " +
+        "\\[([^]]*)] (\\w+)\\s+(\\S+) - (.*)");
+
+    logConfig.setSchema();
+    logConfig.getSchema().add( new LogFormatField("year","INT"));
+    logConfig.getSchema().add( new LogFormatField("month","INT"));
+    logConfig.getSchema().add( new LogFormatField("day","INT"));
+    logConfig.getSchema().add( new LogFormatField("hour","INT"));
+    logConfig.getSchema().add( new LogFormatField("minute","INT"));
+    logConfig.getSchema().add( new LogFormatField("second","INT"));
+    logConfig.getSchema().add( new LogFormatField("thread"));
+    logConfig.getSchema().add( new LogFormatField("level"));
+    logConfig.getSchema().add( new LogFormatField("module"));
+    logConfig.getSchema().add( new LogFormatField("message"));
+
+
+    //Set up additional configs to check the time/date formats
+    LogFormatConfig logDateConfig = new LogFormatConfig();
+    logDateConfig.setExtension("log2");
+    logDateConfig.setRegex("(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}),(\\d+)\\s\\[(\\w+)\\]\\s([A-Z]+)\\s(.+)");
+
+    logDateConfig.setSchema();
+    logDateConfig.getSchema().add( new LogFormatField( "entry_date", "TIMESTAMP", "yy-MM-dd hh:mm:ss"));
+    logDateConfig.getSchema().add( new LogFormatField( "pid", "INT"));
+    logDateConfig.getSchema().add( new LogFormatField( "location"));
+    logDateConfig.getSchema().add( new LogFormatField( "message_type"));
+    logDateConfig.getSchema().add( new LogFormatField( "message"));
+
+    logDateConfig.setMaxErrors(3);
+
+    LogFormatConfig mysqlLogConfig = new LogFormatConfig();
+    mysqlLogConfig.setExtension("sqllog");
+    mysqlLogConfig.setRegex("(\\d{6})\\s(\\d{2}:\\d{2}:\\d{2})\\s+(\\d+)\\s(\\w+)\\s+(.+)");
+
+
+    // Define a temporary format plugin for the "cp" storage plugin.
+    Drillbit drillbit = cluster.drillbit();
+    final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
+    final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin("cp");
+    final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
+    pluginConfig.getFormats().put("sample", sampleConfig);
+    pluginConfig.getFormats().put("drill-log", logConfig);
+    pluginConfig.getFormats().put("date-log",logDateConfig);
+    pluginConfig.getFormats().put( "mysql-log", mysqlLogConfig);
+    pluginRegistry.createOrUpdate("cp", pluginConfig, false);
+
+  }
+
+  @Test
+  public void testWildcard() throws RpcException {
+    String sql = "SELECT * FROM cp.`regex/simple.log1`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("year", MinorType.INT)
+        .addNullable("month", MinorType.INT)
+        .addNullable("day", MinorType.INT)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow(2017, 12, 17)
+        .addRow(2017, 12, 18)
+        .addRow(2017, 12, 19)
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testExplicit() throws RpcException {
+    String sql = "SELECT `day`, `month` FROM cp.`regex/simple.log1`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("day", MinorType.INT)
+        .addNullable("month", MinorType.INT)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow(17, 12)
+        .addRow(18, 12)
+        .addRow(19, 12)
+        .build();
+
+//    results.print();
+//    expected.print();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testMissing() throws RpcException {
+    String sql = "SELECT `day`, `missing`, `month` FROM cp.`regex/simple.log1`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("day", MinorType.INT)
+        .addNullable("missing", MinorType.VARCHAR)
+        .addNullable("month", MinorType.INT)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow(17, null, 12)
+        .addRow(18, null, 12)
+        .addRow(19, null, 12)
+        .build();
+
+//    results.print();
+//    expected.print();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testRaw() throws RpcException {
+    String sql = "SELECT `_raw` FROM cp.`regex/simple.log1`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("_raw", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("2017-12-17 10:52:41,820 [main] INFO  o.a.d.e.e.f.FunctionImplementationRegistry - Function registry loaded.  459 functions loaded in 1396 ms.")
+        .addRow("2017-12-18 10:52:37,652 [main] INFO  o.a.drill.common.config.DrillConfig - Configuration and plugin file(s) identified in 115ms.")
+        .addRow("2017-12-19 11:12:27,278 [main] ERROR o.apache.drill.exec.server.Drillbit - Failure during initial startup of Drillbit.")
+        .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+
+  @Test
+  public void testDate() throws RpcException {
+    String sql = "SELECT TYPEOF(`entry_date`) AS entry_date FROM cp.`regex/simple.log2` LIMIT 1";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .add("entry_date", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("TIMESTAMP")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+
+  }
+
+  @Test
+  public void testCount() throws RpcException {
+    String sql = "SELECT COUNT(*) FROM cp.`regex/simple.log1`";
+    long result = client.queryBuilder().sql(sql).singletonLong();
+    assertEquals(3, result);
+  }
+
+  @Test
+  public void testFull() throws RpcException {
+    String sql = "SELECT * FROM cp.`regex/simple.log1`";
+    client.queryBuilder().sql(sql).printCsv();
+  }
+
+  //This section tests log queries without a defined schema
+  @Test
+  public void testStarQueryNoSchema() throws RpcException {
+    String sql = "SELECT * FROM cp.`regex/mysql.sqllog`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("field_0", MinorType.VARCHAR)
+        .addNullable("field_1", MinorType.VARCHAR)
+        .addNullable("field_2", MinorType.VARCHAR)
+        .addNullable("field_3", MinorType.VARCHAR)
+        .addNullable("field_4", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("070823", "21:00:32", "1", "Connect", "root@localhost on test1")
+        .addRow("070823", "21:00:48", "1", "Query", "show tables")
+        .addRow("070823", "21:00:56", "1", "Query", "select * from category" )
+        .addRow("070917", "16:29:01", "21", "Query","select * from location" )
+        .addRow("070917", "16:29:12", "21", "Query","select * from location where id = 1 LIMIT 1" )
+        .build();
+
+    //results.print();
+    //expected.print();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testAllFieldsQueryNoSchema() throws RpcException {
+    String sql = "SELECT field_0, field_1, field_2, field_3, field_4 FROM cp.`regex/mysql.sqllog`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("field_0", MinorType.VARCHAR)
+        .addNullable("field_1", MinorType.VARCHAR)
+        .addNullable("field_2", MinorType.VARCHAR)
+        .addNullable("field_3", MinorType.VARCHAR)
+        .addNullable("field_4", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("070823", "21:00:32", "1", "Connect", "root@localhost on test1")
+        .addRow("070823", "21:00:48", "1", "Query", "show tables")
+        .addRow("070823", "21:00:56", "1", "Query", "select * from category" )
+        .addRow("070917", "16:29:01", "21", "Query","select * from location" )
+        .addRow("070917", "16:29:12", "21", "Query","select * from location where id = 1 LIMIT 1" )
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSomeFieldsQueryNoSchema() throws RpcException {
+    String sql = "SELECT field_0, field_4 FROM cp.`regex/mysql.sqllog`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("field_0", MinorType.VARCHAR)
+        .addNullable("field_4", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("070823", "root@localhost on test1")
+        .addRow("070823",  "show tables")
+        .addRow("070823",  "select * from category" )
+        .addRow("070917",  "select * from location" )
+        .addRow("070917", "select * from location where id = 1 LIMIT 1" )
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testRawNoSchema() throws RpcException {
+    String sql = "SELECT _raw FROM cp.`regex/mysql.sqllog`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("_raw", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("070823 21:00:32       1 Connect     root@localhost on test1")
+        .addRow("070823 21:00:48       1 Query       show tables")
+        .addRow("070823 21:00:56       1 Query       select * from category" )
+        .addRow("070917 16:29:01      21 Query       select * from location" )
+        .addRow("070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1" )
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testUMNoSchema() throws RpcException {
+    String sql = "SELECT _unmatched_rows FROM cp.`regex/mysql.sqllog`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("_unmatched_rows", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("dfadkfjaldkjafsdfjlksdjflksjdlkfjsldkfjslkjl")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testRawUMNoSchema() throws RpcException {
+    String sql = "SELECT _raw, _unmatched_rows FROM cp.`regex/mysql.sqllog`";
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    BatchSchema expectedSchema = new SchemaBuilder()
+        .addNullable("_raw", MinorType.VARCHAR)
+        .addNullable("_unmatched_rows", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow("070823 21:00:32       1 Connect     root@localhost on test1", null)
+        .addRow("070823 21:00:48       1 Query       show tables", null)
+        .addRow("070823 21:00:56       1 Query       select * from category", null )
+        .addRow("070917 16:29:01      21 Query       select * from location", null )
+        .addRow("070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1", null )
+        .addRow( null, "dfadkfjaldkjafsdfjlksdjflksjdlkfjsldkfjslkjl")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/regex/baddates.log2 b/exec/java-exec/src/test/resources/regex/baddates.log2
new file mode 100644
index 0000000..64b6b77
--- /dev/null
+++ b/exec/java-exec/src/test/resources/regex/baddates.log2
@@ -0,0 +1,5 @@
+2017-14-17 10:52:41,820 [main] INFO  o.a.d.e.e.f.FunctionImplementationRegistry - Function registry loaded.  459 functions loaded in 1396 ms.
+2017-15-18 10:52:37,652 [main] INFO  o.a.drill.common.config.DrillConfig - Configuration and plugin file(s) identified in 115ms.
+Base Configuration:
+    - jar:file:/foo/apache-drill-1.13.0-SNAPSHOT/jars/drill-common-1.13.0-SNAPSHOT.jar!/drill-default.conf
+2017-16-19 11:12:27,278 [main] ERROR o.apache.drill.exec.server.Drillbit - Failure during initial startup of Drillbit.
diff --git a/exec/java-exec/src/test/resources/regex/mysql.sqllog b/exec/java-exec/src/test/resources/regex/mysql.sqllog
new file mode 100644
index 0000000..3d1b128
--- /dev/null
+++ b/exec/java-exec/src/test/resources/regex/mysql.sqllog
@@ -0,0 +1,6 @@
+070823 21:00:32       1 Connect     root@localhost on test1
+070823 21:00:48       1 Query       show tables
+070823 21:00:56       1 Query       select * from category
+070917 16:29:01      21 Query       select * from location
+070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1
+dfadkfjaldkjafsdfjlksdjflksjdlkfjsldkfjslkjl
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/regex/mysql.sqllog2 b/exec/java-exec/src/test/resources/regex/mysql.sqllog2
new file mode 100644
index 0000000..3d1b128
--- /dev/null
+++ b/exec/java-exec/src/test/resources/regex/mysql.sqllog2
@@ -0,0 +1,6 @@
+070823 21:00:32       1 Connect     root@localhost on test1
+070823 21:00:48       1 Query       show tables
+070823 21:00:56       1 Query       select * from category
+070917 16:29:01      21 Query       select * from location
+070917 16:29:12      21 Query       select * from location where id = 1 LIMIT 1
+dfadkfjaldkjafsdfjlksdjflksjdlkfjsldkfjslkjl
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/regex/simple.log1 b/exec/java-exec/src/test/resources/regex/simple.log1
new file mode 100644
index 0000000..06df7be
--- /dev/null
+++ b/exec/java-exec/src/test/resources/regex/simple.log1
@@ -0,0 +1,5 @@
+2017-12-17 10:52:41,820 [main] INFO  o.a.d.e.e.f.FunctionImplementationRegistry - Function registry loaded.  459 functions loaded in 1396 ms.
+2017-12-18 10:52:37,652 [main] INFO  o.a.drill.common.config.DrillConfig - Configuration and plugin file(s) identified in 115ms.
+Base Configuration:
+    - jar:file:/foo/apache-drill-1.13.0-SNAPSHOT/jars/drill-common-1.13.0-SNAPSHOT.jar!/drill-default.conf
+2017-12-19 11:12:27,278 [main] ERROR o.apache.drill.exec.server.Drillbit - Failure during initial startup of Drillbit.
diff --git a/exec/java-exec/src/test/resources/regex/simple.log2 b/exec/java-exec/src/test/resources/regex/simple.log2
new file mode 100644
index 0000000..06df7be
--- /dev/null
+++ b/exec/java-exec/src/test/resources/regex/simple.log2
@@ -0,0 +1,5 @@
+2017-12-17 10:52:41,820 [main] INFO  o.a.d.e.e.f.FunctionImplementationRegistry - Function registry loaded.  459 functions loaded in 1396 ms.
+2017-12-18 10:52:37,652 [main] INFO  o.a.drill.common.config.DrillConfig - Configuration and plugin file(s) identified in 115ms.
+Base Configuration:
+    - jar:file:/foo/apache-drill-1.13.0-SNAPSHOT/jars/drill-common-1.13.0-SNAPSHOT.jar!/drill-default.conf
+2017-12-19 11:12:27,278 [main] ERROR o.apache.drill.exec.server.Drillbit - Failure during initial startup of Drillbit.
diff --git a/pom.xml b/pom.xml
index cbd27e8..c2447aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -323,6 +323,11 @@
         <configuration>
           <excludeSubprojects>false</excludeSubprojects>
           <excludes>
+            <!-- Types log1, log2, sqllog and sqllog2 are used to test he logRegex format plugin. -->
+            <exclude>**/*.log1</exclude>
+            <exclude>**/*.log2</exclude>
+            <exclude>**/*.sqllog</exclude>
+            <exclude>**/*.sqllog2</exclude>
             <exclude>**/*.log</exclude>
             <exclude>**/*.css</exclude>
             <exclude>**/*.js</exclude>
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 7162ead..64703c3 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -545,6 +545,10 @@ public final class UserBitShared {
      * <code>JDBC_SCAN = 44;</code>
      */
     JDBC_SCAN(44, 44),
+    /**
+     * <code>REGEX_SUB_SCAN = 45;</code>
+     */
+    REGEX_SUB_SCAN(45,45),
     ;
 
     /**
@@ -727,6 +731,11 @@ public final class UserBitShared {
      * <code>JDBC_SCAN = 44;</code>
      */
     public static final int JDBC_SCAN_VALUE = 44;
+    /**
+     * <code>REGEX_SUB_SCAN = 45;</code>
+     */
+    public static final int REGEX_SUB_SCAN_VALUE = 45;
+
 
 
     public final int getNumber() { return value; }
@@ -778,6 +787,7 @@ public final class UserBitShared {
         case 42: return UNNEST;
         case 43: return HIVE_DRILL_NATIVE_PARQUET_ROW_GROUP_SCAN;
         case 44: return JDBC_SCAN;
+        case 45: return REGEX_SUB_SCAN;
         default: return null;
       }
     }


[drill] 02/03: DRILL-6475: Unnest: Null fieldId Pointer.

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 5a0c75f69166283a17179e80b97712bed57110d9
Author: HanumathRao <ha...@gmail.com>
AuthorDate: Fri Jun 29 08:46:41 2018 -0700

    DRILL-6475: Unnest: Null fieldId Pointer.
    
    closes #1381
---
 .../visitor/AdjustOperatorsSchemaVisitor.java      | 148 +++++++++++++++++++++
 .../physical/visitor/JoinPrelRenameVisitor.java    |  87 ------------
 .../planner/sql/handlers/DefaultSqlHandler.java    |   5 +-
 .../impl/lateraljoin/TestLateralPlans.java         |  24 ++++
 4 files changed, 175 insertions(+), 89 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/AdjustOperatorsSchemaVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/AdjustOperatorsSchemaVisitor.java
new file mode 100644
index 0000000..c46b725
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/AdjustOperatorsSchemaVisitor.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.physical.JoinPrel;
+import org.apache.drill.exec.planner.physical.LateralJoinPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.calcite.rel.RelNode;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.planner.physical.UnnestPrel;
+
+/**
+ * AdjustOperatorsSchemaVisitor visits corresponding operators' which depending upon their functionality
+ * adjusts their output row types. The adjusting mechanism is unique to each operator. In case of joins this visitor
+ * adjusts the field names to make sure that upstream operator only sees that there are unique field names even though
+ * the children of the join has same field names. Whereas in case of lateral/unnest operators it changes the correlated
+ * field and also the unnest operator's output row type.
+ */
+public class AdjustOperatorsSchemaVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{
+
+  private Prel registeredPrel = null;
+
+  private static AdjustOperatorsSchemaVisitor INSTANCE = new AdjustOperatorsSchemaVisitor();
+
+  public static Prel adjustSchema(Prel prel){
+    return prel.accept(INSTANCE, null);
+  }
+
+  private void register(Prel prel) {
+    this.registeredPrel = prel;
+  }
+
+  private Prel getRegisteredPrel() {
+    return this.registeredPrel;
+  }
+
+  @Override
+  public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
+    return preparePrel(prel, getChildren(prel));
+  }
+
+  public void unRegister() {
+    this.registeredPrel = null;
+  }
+
+  private List<RelNode> getChildren(Prel prel, int registerForChild) {
+    int ch = 0;
+    List<RelNode> children = Lists.newArrayList();
+    for(Prel child : prel){
+      if (ch == registerForChild) {
+        register(prel);
+      }
+      child = child.accept(this, null);
+      if (ch == registerForChild) {
+        unRegister();
+      }
+      children.add(child);
+      ch++;
+    }
+    return children;
+  }
+
+  private List<RelNode> getChildren(Prel prel) {
+    return getChildren(prel, -1);
+  }
+
+  private Prel preparePrel(Prel prel, List<RelNode> renamedNodes) {
+    return (Prel) prel.copy(prel.getTraitSet(), renamedNodes);
+  }
+
+  @Override
+  public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException {
+
+    List<RelNode> children = getChildren(prel);
+
+    final int leftCount = children.get(0).getRowType().getFieldCount();
+
+    List<RelNode> reNamedChildren = Lists.newArrayList();
+
+    RelNode left = prel.getJoinInput(0, children.get(0));
+    RelNode right = prel.getJoinInput(leftCount, children.get(1));
+
+    reNamedChildren.add(left);
+    reNamedChildren.add(right);
+
+    return preparePrel(prel, reNamedChildren);
+  }
+
+  @Override
+  public Prel visitLateral(LateralJoinPrel prel, Void value) throws RuntimeException {
+
+    List<RelNode> children = getChildren(prel, 1);
+    List<RelNode> reNamedChildren = new ArrayList<>();
+
+    for (int i = 0; i < children.size(); i++) {
+      reNamedChildren.add(prel.getLateralInput(i, children.get(i)));
+    }
+
+    return preparePrel(prel, reNamedChildren);
+  }
+
+  @Override
+  public Prel visitUnnest(UnnestPrel prel, Void value) throws RuntimeException {
+    Preconditions.checkArgument(registeredPrel != null && registeredPrel instanceof LateralJoinPrel);
+    Preconditions.checkArgument(prel.getRowType().getFieldCount() == 1);
+    RexBuilder builder = prel.getCluster().getRexBuilder();
+
+    LateralJoinPrel lateralJoinPrel = (LateralJoinPrel) getRegisteredPrel();
+    int correlationIndex = lateralJoinPrel.getRequiredColumns().nextSetBit(0);
+    String correlationColumnName = lateralJoinPrel.getLeft().getRowType().getFieldNames().get(correlationIndex);
+    RexNode corrRef = builder.makeCorrel(lateralJoinPrel.getLeft().getRowType(), lateralJoinPrel.getCorrelationId());
+    RexNode fieldAccess = builder.makeFieldAccess(corrRef, correlationColumnName, false);
+
+    List<String> fieldNames = new ArrayList<>();
+    List<RelDataType> fieldTypes = new ArrayList<>();
+    for (RelDataTypeField field : prel.getRowType().getFieldList()) {
+      fieldNames.add(correlationColumnName);
+      fieldTypes.add(field.getType());
+    }
+
+    UnnestPrel unnestPrel = new UnnestPrel(prel.getCluster(), prel.getTraitSet(),
+            prel.getCluster().getTypeFactory().createStructType(fieldTypes, fieldNames), fieldAccess);
+    return unnestPrel;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
deleted file mode 100644
index 3a2529b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.physical.visitor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.drill.exec.planner.physical.JoinPrel;
-import org.apache.drill.exec.planner.physical.LateralJoinPrel;
-import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.calcite.rel.RelNode;
-
-import com.google.common.collect.Lists;
-
-public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{
-
-  private static JoinPrelRenameVisitor INSTANCE = new JoinPrelRenameVisitor();
-
-  public static Prel insertRenameProject(Prel prel){
-    return prel.accept(INSTANCE, null);
-  }
-
-  @Override
-  public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
-    return preparePrel(prel, getChildren(prel));
-  }
-
-  private List<RelNode> getChildren(Prel prel) {
-    List<RelNode> children = Lists.newArrayList();
-    for(Prel child : prel){
-      child = child.accept(this, null);
-      children.add(child);
-    }
-    return children;
-  }
-
-  private Prel preparePrel(Prel prel, List<RelNode> renamedNodes) {
-    return (Prel) prel.copy(prel.getTraitSet(), renamedNodes);
-  }
-
-  @Override
-  public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException {
-
-    List<RelNode> children = getChildren(prel);
-
-    final int leftCount = children.get(0).getRowType().getFieldCount();
-
-    List<RelNode> reNamedChildren = Lists.newArrayList();
-
-    RelNode left = prel.getJoinInput(0, children.get(0));
-    RelNode right = prel.getJoinInput(leftCount, children.get(1));
-
-    reNamedChildren.add(left);
-    reNamedChildren.add(right);
-
-    return preparePrel(prel, reNamedChildren);
-  }
-
-  //TODO: consolidate this code with join column renaming.
-  @Override
-  public Prel visitLateral(LateralJoinPrel prel, Void value) throws RuntimeException {
-
-    List<RelNode> children = getChildren(prel);
-    List<RelNode> reNamedChildren = new ArrayList<>();
-
-    for (int i = 0; i < children.size(); i++) {
-      reNamedChildren.add(prel.getLateralInput(i, children.get(i)));
-    }
-
-    return preparePrel(prel, reNamedChildren);
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 1e671ff..83e1a8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -88,11 +88,11 @@ import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.explain.PrelSequencer;
+import org.apache.drill.exec.planner.physical.visitor.AdjustOperatorsSchemaVisitor;
 import org.apache.drill.exec.planner.physical.visitor.ComplexToJsonPrelVisitor;
 import org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier;
 import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer;
 import org.apache.drill.exec.planner.physical.visitor.InsertLocalExchangeVisitor;
-import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor;
 import org.apache.drill.exec.planner.physical.visitor.MemoryEstimationVisitor;
 import org.apache.drill.exec.planner.physical.visitor.RelUniqifier;
 import org.apache.drill.exec.planner.physical.visitor.RewriteProjectToFlatten;
@@ -512,8 +512,9 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
      * 2.)
      * Join might cause naming conflicts from its left and right child.
      * In such case, we have to insert Project to rename the conflicting names.
+     * Unnest operator might need to adjust the correlated field after the physical planning.
      */
-    phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode);
+    phyRelNode = AdjustOperatorsSchemaVisitor.adjustSchema(phyRelNode);
 
     /*
      * 2.1) Swap left / right for INNER hash join, if left's row count is < (1 + margin) right's row count.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
index 77d245f..222b036 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
@@ -358,6 +358,7 @@ public class TestLateralPlans extends BaseTestQuery {
   public void testNoExchangeWithStreamAggWithGrpBy() throws Exception {
     String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," +
             " lateral ( select sum(t2.ord.o_totalprice) as totalprice from unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1";
+
     ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
             .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
             .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
@@ -532,4 +533,27 @@ public class TestLateralPlans extends BaseTestQuery {
           plan, not(containsString("Sort")));
     }
   }
+
+  @Test
+  public void testMultiUnnestQuery() throws Exception {
+    String Sql = "SELECT t5.l_quantity FROM dfs.`lateraljoin/multipleFiles` t, " +
+            "LATERAL (SELECT t2.ordrs.o_lineitems FROM UNNEST(t.c_orders) t2(ordrs)) t3(lineitems), " +
+            "LATERAL (SELECT t4.lineitems.l_quantity FROM UNNEST(t3.lineitems) t4(lineitems)) t5(l_quantity) order by 1";
+
+    String baselineQuery = "select dt.lineitems.l_quantity as l_quantity from (select flatten(dt.orders.o_lineitems) as lineitems " +
+            "from (select flatten(c_orders) as orders from dfs.`lateraljoin/multipleFiles` t) dt)dt order by 1";
+
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+      .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+      .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      client.testBuilder()
+              .ordered()
+              .sqlBaselineQuery(baselineQuery)
+              .sqlQuery(Sql)
+              .go();
+    }
+  }
 }