You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/07/13 03:44:52 UTC

[drill] branch master updated (a77fd14 -> eb90ebd)

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from a77fd14  DRILL-6516: EMIT support in streaming agg
     new c644367  DRILL-6594: Data batches for Project operator are not being split properly and exceed the maximum specified
     new cfe61eb  DRILL-6578: Handle query cancellation in Parquet reader
     new 80fb761  DRILL-6560: Enhanced the batch statistics logging enablement
     new d4f3304  DRILL-6559: Travis timing out
     new a97cce3  DRILL-6346: Create an Official Drill Docker Container
     new c396ae7  DRILL-6596: Fix fillEmpties and set methods for Nullable variable length vectors to not use emptyByteArray
     new cad9aad  DRILL-6592: Unnest record batch size is called too frequently
     new 56f951c  DRILL-6579: Added sanity checks to the Parquet reader to avoid infinite loops
     new bd4049d  [DRILL-6581] C++ Client SSL Implementation Fixes/Improvements
     new b1eb9d7  [DRILL-6586]  Add SSL Hostname verification with zookeeper connection mode support
     new 94186fc  [DRILL-6587] Added support for custom SSL CTX Options
     new 4168e1e  DRILL-6542 : IndexOutOfBoundsException for multilevel lateral queries with schema changed partitioned complex data
     new eb90ebd  DRILL-6601 LageFileCompilation testProject times out

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .travis.yml                                        |   2 +-
 .../common/exceptions/DrillRuntimeException.java   |  18 ++
 contrib/native/client/example/querySubmitter.cpp   |   7 +-
 contrib/native/client/src/clientlib/channel.cpp    |  54 +++---
 contrib/native/client/src/clientlib/channel.hpp    | 120 +++++++++++++-
 contrib/native/client/src/clientlib/errmsgs.cpp    |   3 +
 contrib/native/client/src/clientlib/errmsgs.hpp    |   5 +-
 .../native/client/src/clientlib/userProperties.cpp |   1 +
 contrib/native/client/src/include/drill/common.hpp |   3 +-
 .../client/src/include/drill/userProperties.hpp    |   3 +-
 contrib/pom.xml                                    |   9 +
 .../drill/exec/store/mongo/MongoTestSuit.java      |   4 +
 .../exec/store/mongo/TestMongoChunkAssignment.java |   3 +-
 .../cpProtofiles.sh => distribution/Dockerfile     |  34 ++--
 distribution/pom.xml                               |  28 ++++
 docs/dev/Docker.md                                 |  97 +++++++++++
 .../java/org/apache/drill/exec/ExecConstants.java  |   3 +
 .../apache/drill/exec/physical/impl/ScanBatch.java |  11 +-
 .../impl/project/OutputWidthExpression.java        |  17 +-
 .../physical/impl/project/OutputWidthVisitor.java  |   2 +-
 .../impl/project/OutputWidthVisitorState.java      |   7 +-
 .../impl/project/ProjectMemoryManager.java         |  48 +++---
 .../physical/impl/project/ProjectRecordBatch.java  |  22 ++-
 .../drill/exec/physical/impl/unnest/Unnest.java    |   2 +
 .../exec/physical/impl/unnest/UnnestImpl.java      |  20 +++
 .../physical/impl/unnest/UnnestRecordBatch.java    |  70 +++++---
 .../exec/server/options/SystemOptionManager.java   |   5 +-
 .../store/parquet/columnreaders/BatchReader.java   |   4 +-
 .../parquet/columnreaders/ParquetRecordReader.java |   3 +-
 .../parquet/columnreaders/VarLenBinaryReader.java  |  18 +-
 .../columnreaders/VarLenBulkPageReader.java        |   7 +-
 .../columnreaders/VarLenEntryDictionaryReader.java |   3 +
 .../parquet/columnreaders/VarLenEntryReader.java   |   3 +
 .../columnreaders/VarLenFixedEntryReader.java      |   9 +-
 .../VarLenNullableDictionaryReader.java            |   3 +
 .../columnreaders/VarLenNullableEntryReader.java   |   3 +
 .../VarLenNullableFixedEntryReader.java            |   6 +-
 .../columnreaders/VarLenOverflowReader.java        |   3 +
 .../batchsizing/OverflowSerDeUtil.java             |  12 +-
 .../batchsizing/RecordBatchOverflow.java           |  17 +-
 .../batchsizing/RecordBatchSizerManager.java       |  45 +++--
 .../drill/exec/util/record/RecordBatchStats.java   | 181 ++++++++++++++-------
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 .../test/java/org/apache/drill/TestTpchLimit0.java |   4 +-
 .../java/org/apache/drill/TestTpchSingleMode.java  |   3 +
 .../exec/compile/TestLargeFileCompilation.java     |   2 +-
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |  51 ++++++
 .../codegen/templates/NullableValueVectors.java    |  22 +--
 .../codegen/templates/VariableLengthVectors.java   |   4 +-
 pom.xml                                            |   1 +
 50 files changed, 735 insertions(+), 268 deletions(-)
 copy contrib/native/client/scripts/cpProtofiles.sh => distribution/Dockerfile (54%)
 create mode 100644 docs/dev/Docker.md


[drill] 05/13: DRILL-6346: Create an Official Drill Docker Container

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit a97cce34e9f7ae9d522bf0542a984160d95a787c
Author: Abhishek Girish <ag...@apache.org>
AuthorDate: Mon Jun 25 13:01:04 2018 -0700

    DRILL-6346: Create an Official Drill Docker Container
    
    closes #1348
---
 distribution/Dockerfile | 35 ++++++++++++++++++
 distribution/pom.xml    | 28 ++++++++++++++
 docs/dev/Docker.md      | 97 +++++++++++++++++++++++++++++++++++++++++++++++++
 pom.xml                 |  1 +
 4 files changed, 161 insertions(+)

diff --git a/distribution/Dockerfile b/distribution/Dockerfile
new file mode 100755
index 0000000..1ea1e456
--- /dev/null
+++ b/distribution/Dockerfile
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM centos:7
+
+# Project version defined in pom.xml is passed as an argument
+ARG VERSION
+
+# JDK 8 is a pre-requisite to run Drill ; 'which' package is needed for drill-config.sh
+RUN yum install -y java-1.8.0-openjdk-devel which ; yum clean all ; rm -rf /var/cache/yum
+
+# The drill tarball is generated upon building the Drill project
+COPY target/apache-drill-$VERSION.tar.gz /tmp
+
+# Drill binaries are extracted into the '/opt/drill' directory
+RUN mkdir /opt/drill
+RUN tar -xvzf /tmp/apache-drill-$VERSION.tar.gz --directory=/opt/drill --strip-components 1
+
+# Starts Drill in embedded mode and connects to Sqlline
+ENTRYPOINT /opt/drill/bin/drill-embedded
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 796b02e..b23fe89 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -485,6 +485,34 @@
         </plugins>
       </build>
     </profile>
+    <profile>
+      <id>docker</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>com.spotify</groupId>
+            <artifactId>dockerfile-maven-plugin</artifactId>
+            <version>1.4.3</version>
+            <executions>
+              <execution>
+                <id>docker-image</id>
+                <goals>
+                  <goal>build</goal>
+                  <goal>push</goal>
+                </goals>
+              </execution>
+            </executions>
+            <configuration>
+              <repository>${docker.repository}</repository>
+              <tag>${project.version}</tag>
+              <buildArgs>
+                <VERSION>${project.version}</VERSION>
+              </buildArgs>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 
 </project>
diff --git a/docs/dev/Docker.md b/docs/dev/Docker.md
new file mode 100644
index 0000000..acfea48
--- /dev/null
+++ b/docs/dev/Docker.md
@@ -0,0 +1,97 @@
+# How to build, publish and run a Apache Drill Docker image
+
+## Prerequisites
+
+   To build an Apache Drill docker image, you need to have the following software installed on your system to successfully complete a build. 
+  * [Java 8](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html)
+  * [Maven 3.3.1 or greater](https://maven.apache.org/download.cgi)
+  * [Docker CE](https://store.docker.com/search?type=edition&offering=community)
+  
+   If you are using an older Mac or PC, additionally configure [docker-machine](https://docs.docker.com/machine/overview/#what-is-docker-machine) on your system
+
+## Checkout
+```
+git clone https://github.com/apache/drill.git
+```
+## Build Drill
+```
+$ cd drill
+$ mvn clean install
+```   
+## Build Docker Image
+```
+$ cd distribution
+$ mvn dockerfile:build -Pdocker
+```
+## Push Docker Image
+   
+   By default, the docker image built above is configured to be pushed to [Drill Docker Hub](https://hub.docker.com/r/drill/apache-drill/) to create official Drill Docker images.
+```   
+$ cd distribution
+$ mvn dockerfile:push -Pdocker
+```    
+  You can configure the repository in pom.xml to point to any private or public container registry, or specify it in your mvn command.
+```  
+$ cd distribution
+$ mvn dockerfile:push -Pdocker -Pdocker.repository=<my_repo>
+```
+## Run Docker Container
+   
+   Running the Docker container should start Drill in embedded mode and connect to Sqlline. 
+```    
+$ docker run -i --name drill-1.14.0 -p 8047:8047 -t drill/apache-drill:1.14.0 /bin/bash
+Jun 29, 2018 3:28:21 AM org.glassfish.jersey.server.ApplicationHandler initialize
+INFO: Initiating Jersey application, version Jersey: 2.8 2014-04-29 01:25:26...
+apache drill 1.14.0 
+"json ain't no thang"
+0: jdbc:drill:zk=local> select version from sys.version;
++------------+
+|  version   |
++------------+
+| 1.14.0     |
++------------+
+1 row selected (0.28 seconds)
+```  
+
+   You can also run the container in detached mode and connect to sqlline using drill-localhost. 
+```    
+$ docker run -i --name drill-1.14.0 -p 8047:8047 --detach -t drill/apache-drill:1.14.0 /bin/bash
+<displays container ID>
+
+$ docker exec -it drill-1.14.0 bash
+<connects to container>
+
+$ /opt/drill/bin/drill-localhost
+apache drill 1.14.0 
+"json ain't no thang"
+0: jdbc:drill:drillbit=localhost> select version from sys.version;
++------------+
+|  version   |
++------------+
+| 1.14.0     |
++------------+
+1 row selected (0.28 seconds)
+```
+
+## Querying Data
+
+   By default, you can only query files which are accessible within the container. For example, the sample data which ships with Drill. 
+```
+> select first_name, last_name from cp.`employee.json` limit 1;
++-------------+------------+
+| first_name  | last_name  |
++-------------+------------+
+| Sheri       | Nowmer     |
++-------------+------------+
+1 row selected (0.256 seconds)
+```
+
+   To query files outside of the container, you can configure [docker volumes](https://docs.docker.com/storage/volumes/#start-a-service-with-volumes)
+   
+## Drill Web UI
+
+   Drill web UI can be accessed using http://localhost:8047 once the Drill docker container is up and running. On Windows, you may need to specify the IP address of your system instead of 'localhost'.
+
+## More information 
+
+   For more information including how to run Apache Drill in a Docker container, visit the [Apache Drill Documentation](http://drill.apache.org/docs/)
diff --git a/pom.xml b/pom.xml
index 56582c3..e195434 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
     <additionalparam>-Xdoclint:none</additionalparam>
     <rat.skip>true</rat.skip>
     <license.skip>true</license.skip>
+    <docker.repository>drill/apache-drill</docker.repository>
   </properties>
 
   <scm>


[drill] 08/13: DRILL-6579: Added sanity checks to the Parquet reader to avoid infinite loops

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 56f951cc7a03e497ba34eeca4bd9265ae30c4650
Author: Salim Achouche <sa...@gmail.com>
AuthorDate: Mon Jul 2 22:04:20 2018 -0700

    DRILL-6579: Added sanity checks to the Parquet reader to avoid infinite loops
    
    closes #1361
---
 .../drill/exec/store/parquet/columnreaders/BatchReader.java      | 4 ++--
 .../exec/store/parquet/columnreaders/VarLenBinaryReader.java     | 3 ++-
 .../exec/store/parquet/columnreaders/VarLenBulkPageReader.java   | 7 +++++--
 .../store/parquet/columnreaders/VarLenEntryDictionaryReader.java | 3 +++
 .../exec/store/parquet/columnreaders/VarLenEntryReader.java      | 3 +++
 .../exec/store/parquet/columnreaders/VarLenFixedEntryReader.java | 9 +++++----
 .../parquet/columnreaders/VarLenNullableDictionaryReader.java    | 3 +++
 .../store/parquet/columnreaders/VarLenNullableEntryReader.java   | 3 +++
 .../parquet/columnreaders/VarLenNullableFixedEntryReader.java    | 6 ++++--
 .../exec/store/parquet/columnreaders/VarLenOverflowReader.java   | 3 +++
 10 files changed, 33 insertions(+), 11 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
index 25dfbc8..f5825e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
@@ -40,9 +40,9 @@ public abstract class BatchReader {
     ColumnReader<?> firstColumnStatus = readState.getFirstColumnReader();
     int currBatchNumRecords = readState.batchSizerMgr().getCurrentRecordsPerBatch();
     long recordsToRead = Math.min(currBatchNumRecords, readState.getRemainingValuesToRead());
-    int readCount = readRecords(firstColumnStatus, recordsToRead);
-
+    int readCount = recordsToRead > 0 ? readRecords(firstColumnStatus, recordsToRead) : 0;
     readState.fillNullVectors(readCount);
+
     return readCount;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index 1fb224d..cba2a79 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.parquet.columnreaders;
 
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -111,7 +112,7 @@ public class VarLenBinaryReader {
 
       // Read the column data
       int readColumns = columnReader.readRecordsInBulk(batchNumRecords);
-      assert readColumns <= batchNumRecords : "Reader cannot return more values than requested..";
+      Preconditions.checkState(readColumns <= batchNumRecords, "Reader cannot return more values than requested..");
 
       if (!overflowCondition) {
         if (prevReadColumns >= 0 && prevReadColumns != readColumns) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
index 0e50406..81b7264 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -106,11 +107,13 @@ final class VarLenBulkPageReader {
     pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
     pageInfo.numPageValues = pageInfoInput.numPageValues;
     if (clear) {
-    buffer.clear();
-  }
+      buffer.clear();
+    }
   }
 
   final VarLenColumnBulkEntry getEntry(int valuesToRead) {
+    Preconditions.checkArgument(valuesToRead > 0, "Number of values to read [%s] should be greater than zero", valuesToRead);
+
     VarLenColumnBulkEntry entry = null;
 
     // If there is overflow data, then we need to consume it first
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
index 8ba7ac4..7d76263 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Preconditions;
 import java.nio.ByteBuffer;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.DictionaryReaderWrapper;
@@ -50,6 +51,8 @@ final class VarLenEntryDictionaryReader extends VarLenAbstractPageEntryReader {
     final DictionaryReaderWrapper valueReader = pageInfo.dictionaryValueReader;
     final int[] valueLengths = entry.getValuesLength();
     final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
+    Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be greater than zero", readBatch);
+
     final byte[] tgtBuff = entry.getInternalDataArray();
     final int tgtLen = tgtBuff.length;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
index d95050d..cec0c7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Preconditions;
 import java.nio.ByteBuffer;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
@@ -51,6 +52,8 @@ final class VarLenEntryReader extends VarLenAbstractPageEntryReader {
 
     final int[] valueLengths = entry.getValuesLength();
     final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
+    Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be greater than zero", readBatch);
+
     final byte[] tgtBuff = entry.getInternalDataArray();
     final byte[] srcBuff = buffer.array();
     final int srcLen = buffer.remaining();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
index e8dc15f..a6e7077 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Preconditions;
 import java.nio.ByteBuffer;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
@@ -32,19 +33,19 @@ final class VarLenFixedEntryReader extends VarLenAbstractPageEntryReader {
     VarLenColumnBulkInputCallback containerCallback) {
 
     super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+    Preconditions.checkArgument(columnPrecInfo.precision >= 0, "Fixed length precision [%s] cannot be lower than zero", columnPrecInfo.precision);
   }
 
   /** {@inheritDoc} */
   @Override
   final VarLenColumnBulkEntry getEntry(int valuesToRead) {
-    assert columnPrecInfo.precision >= 0 : "Fixed length precision cannot be lower than zero";
-
     load(true); // load new data to process
 
     final int expectedDataLen = columnPrecInfo.precision;
     final int entrySz = 4 + columnPrecInfo.precision;
-    final int maxValues = Math.min(entry.getMaxEntries(), (pageInfo.pageDataLen - pageInfo.pageDataOff) / entrySz);
-    final int readBatch = Math.min(maxValues, valuesToRead);
+    final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
+    Preconditions.checkState(readBatch > 0, "Read batch count [%d] should be greater than zero", readBatch);
+
     final int[] valueLengths = entry.getValuesLength();
     final byte[] tgtBuff = entry.getInternalDataArray();
     final byte[] srcBuff = buffer.array();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
index f7b6dce..e33919f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Preconditions;
 import java.nio.ByteBuffer;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.DictionaryReaderWrapper;
@@ -52,6 +53,8 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader
     final DictionaryReaderWrapper valueReader = pageInfo.dictionaryValueReader;
     final int[] valueLengths = entry.getValuesLength();
     final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
+    Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be greater than zero", readBatch);
+
     final byte[] tgtBuff = entry.getInternalDataArray();
     final int tgtLen = tgtBuff.length;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
index 7ffb27a..ce39859 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Preconditions;
 import java.nio.ByteBuffer;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
@@ -53,6 +54,8 @@ final class VarLenNullableEntryReader extends VarLenAbstractPageEntryReader {
 
     final int[] valueLengths = entry.getValuesLength();
     final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
+    Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be greater than zero", readBatch);
+
     final byte[] tgtBuff = entry.getInternalDataArray();
     final byte[] srcBuff = buffer.array();
     final int srcLen = buffer.remaining();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
index 98089fd..3869113 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Preconditions;
 import java.nio.ByteBuffer;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
@@ -33,19 +34,20 @@ final class VarLenNullableFixedEntryReader extends VarLenAbstractPageEntryReader
     VarLenColumnBulkInputCallback containerCallback) {
 
     super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+    Preconditions.checkArgument(columnPrecInfo.precision >= 0, "Fixed length precision cannot be lower than zero");
   }
 
   /** {@inheritDoc} */
   @Override
   final VarLenColumnBulkEntry getEntry(int valuesToRead) {
-    assert columnPrecInfo.precision >= 0 : "Fixed length precision cannot be lower than zero";
-
     // TODO - We should not use force reload for sparse columns (values with lot of nulls)
     load(true); // load new data to process
 
     final int expectedDataLen = columnPrecInfo.precision;
     final int entrySz = 4 + columnPrecInfo.precision;
     final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
+    Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be greater than zero", readBatch);
+
     final int[] valueLengths = entry.getValuesLength();
     final byte[] tgtBuff = entry.getInternalDataArray();
     final byte[] srcBuff = buffer.array();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenOverflowReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenOverflowReader.java
index cacd5c8..6c8891f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenOverflowReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenOverflowReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Preconditions;
 import java.nio.ByteBuffer;
 
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
@@ -80,6 +81,8 @@ public final class VarLenOverflowReader extends VarLenAbstractEntryReader {
     // load some overflow data for processing
     final int maxValues = Math.min(entry.getMaxEntries(), valuesToRead);
     final int numAvailableValues = overflowDataCache.load(overflowState.currValueIdx, maxValues);
+    Preconditions.checkState(numAvailableValues > 0, "Number values to read [%s] should be greater than zero", numAvailableValues);
+
     final int firstValueDataOffset = getDataBufferStartOffset() + adjustDataOffset(overflowState.currValueIdx);
     int totalDataLen = 0;
     int currValueIdx = overflowState.currValueIdx;


[drill] 09/13: [DRILL-6581] C++ Client SSL Implementation Fixes/Improvements

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit bd4049dc657e2f74d69abd7289482a57ea1d98cc
Author: superbstreak <ro...@gmail.com>
AuthorDate: Thu Jul 5 18:11:47 2018 -0700

    [DRILL-6581] C++ Client SSL Implementation Fixes/Improvements
---
 contrib/native/client/src/clientlib/channel.cpp    |  45 +++++----
 contrib/native/client/src/clientlib/channel.hpp    | 108 ++++++++++++++++++++-
 contrib/native/client/src/clientlib/errmsgs.cpp    |   3 +
 contrib/native/client/src/clientlib/errmsgs.hpp    |   5 +-
 .../client/src/include/drill/userProperties.hpp    |   3 +-
 5 files changed, 137 insertions(+), 27 deletions(-)

diff --git a/contrib/native/client/src/clientlib/channel.cpp b/contrib/native/client/src/clientlib/channel.cpp
index 535fad7..fc97816 100644
--- a/contrib/native/client/src/clientlib/channel.cpp
+++ b/contrib/native/client/src/clientlib/channel.cpp
@@ -352,30 +352,37 @@ connectionStatus_t SSLStreamChannel::init(){
     connectionStatus_t ret=CONN_SUCCESS;
 
     const DrillUserProperties* props = m_pContext->getUserProperties();
-	std::string useSystemTrustStore;
-	props->getProp(USERPROP_USESYSTEMTRUSTSTORE, useSystemTrustStore);
-	if (useSystemTrustStore != "true"){
-		std::string certFile;
-		props->getProp(USERPROP_CERTFILEPATH, certFile);
-		try{
-			((SSLChannelContext_t*)m_pContext)->getSslContext().load_verify_file(certFile);
-		}
-		catch (boost::system::system_error e){
-			DRILL_LOG(LOG_ERROR) << "Channel initialization failure. Certificate file  "
-				<< certFile
-				<< " could not be loaded."
-				<< std::endl;
-			handleError(CONN_SSLERROR, getMessage(ERR_CONN_SSLCERTFAIL, certFile.c_str(), e.what()));
-			ret = CONN_FAILURE;
-		}
-	}
+    std::string useSystemTrustStore;
+    props->getProp(USERPROP_USESYSTEMTRUSTSTORE, useSystemTrustStore);
+    if (useSystemTrustStore != "true"){
+        std::string certFile;
+        props->getProp(USERPROP_CERTFILEPATH, certFile);
+        try{
+            ((SSLChannelContext_t*)m_pContext)->getSslContext().load_verify_file(certFile);
+        }
+        catch (boost::system::system_error e){
+            DRILL_LOG(LOG_ERROR) << "Channel initialization failure. Certificate file  "
+                << certFile
+                << " could not be loaded."
+                << std::endl;
+            handleError(CONN_SSLERROR, getMessage(ERR_CONN_SSLCERTFAIL, certFile.c_str(), e.what()));
+            ret = CONN_FAILURE;
+            // Stop here to propagate the load/verify certificate error.
+            return ret;
+        }
+    }
 
+    ((SSLChannelContext_t *)m_pContext)->SetCertHostnameVerificationStatus(true);
     std::string disableHostVerification;
     props->getProp(USERPROP_DISABLE_HOSTVERIFICATION, disableHostVerification);
     if (disableHostVerification != "true") {
-        std::string hostPortStr = m_pEndpoint->getHost() + ":" + m_pEndpoint->getPort();
+        // Populate endpoint information before we retrieve host name.
+        m_pEndpoint->parseConnectString();
+        std::string hostStr  = m_pEndpoint->getHost();
         ((SSLChannelContext_t *) m_pContext)->getSslContext().set_verify_callback(
-                boost::asio::ssl::rfc2818_verification(hostPortStr.c_str()));
+                DrillSSLHostnameVerifier(
+                    ((SSLChannelContext_t *)m_pContext), 
+                    boost::asio::ssl::rfc2818_verification(hostStr.c_str())));
     }
 
     m_pSocket=new SslSocket(m_ioService, ((SSLChannelContext_t*)m_pContext)->getSslContext() );
diff --git a/contrib/native/client/src/clientlib/channel.hpp b/contrib/native/client/src/clientlib/channel.hpp
index 73273aa..e739118 100644
--- a/contrib/native/client/src/clientlib/channel.hpp
+++ b/contrib/native/client/src/clientlib/channel.hpp
@@ -21,6 +21,13 @@
 #include "drill/common.hpp"
 #include "drill/drillClient.hpp"
 #include "streamSocket.hpp"
+#include "errmsgs.hpp"
+
+namespace
+{
+// The error message to indicate certificate verification failure.
+#define DRILL_BOOST_SSL_CERT_VERIFY_FAILED  "handshake: certificate verify failed\0"
+}
 
 namespace Drill {
 
@@ -34,14 +41,13 @@ class UserProperties;
 
             //parse the connection string and set up the host and port to connect to
             connectionStatus_t getDrillbitEndpoint();
-
+            void parseConnectString();
             const std::string& getProtocol() const {return m_protocol;}
             const std::string& getHost() const {return m_host;}
             const std::string& getPort() const {return m_port;}
             DrillClientError* getError(){ return m_pError;};
 
         private:
-            void parseConnectString();
             bool isDirectConnection();
             bool isZookeeperConnection();
             connectionStatus_t getDrillbitEndpointFromZk();
@@ -84,20 +90,38 @@ class UserProperties;
         SSLChannelContext(DrillUserProperties *props,
                           boost::asio::ssl::context::method tlsVersion,
                           boost::asio::ssl::verify_mode verifyMode) :
-                ChannelContext(props),
-                m_SSLContext(tlsVersion) {
+                    ChannelContext(props),
+                    m_SSLContext(tlsVersion),
+                    m_certHostnameVerificationStatus(true) 
+            {
                 m_SSLContext.set_default_verify_paths();
                 m_SSLContext.set_options(
                         boost::asio::ssl::context::default_workarounds
                         | boost::asio::ssl::context::no_sslv2
+                        | boost::asio::ssl::context::no_sslv3
                         | boost::asio::ssl::context::single_dh_use
                         );
                 m_SSLContext.set_verify_mode(verifyMode);
             };
+
             ~SSLChannelContext(){};
             boost::asio::ssl::context& getSslContext(){ return m_SSLContext;}
+
+            /// @brief Check the certificate host name verification status.
+            /// 
+            /// @return FALSE if the verification has failed, TRUE otherwise.
+            const bool GetCertificateHostnameVerificationStatus() const { return m_certHostnameVerificationStatus; }
+
+            /// @brief Set the certificate host name verification status.
+            ///
+            /// @param in_result                The host name verification status.
+            void SetCertHostnameVerificationStatus(bool in_result) { m_certHostnameVerificationStatus = in_result; }
+
         private:
             boost::asio::ssl::context m_SSLContext;
+
+            // The flag to indicate the host name verification result.
+            bool m_certHostnameVerificationStatus;
     };
 
     typedef ChannelContext ChannelContext_t; 
@@ -150,6 +174,15 @@ class UserProperties;
         protected:
             connectionStatus_t handleError(connectionStatus_t status, std::string msg);
 
+            /// @brief Handle protocol handshake exceptions.
+            /// 
+            /// @param in_errmsg                The error message.
+            /// 
+            /// @return the connectionStatus.
+            virtual connectionStatus_t HandleProtocolHandshakeException(const char* in_errmsg){
+                return handleError(CONN_HANDSHAKE_FAILED, in_errmsg);
+            }
+
             boost::asio::io_service& m_ioService;
             boost::asio::io_service m_ioServiceFallback; // used if m_ioService is not provided
             AsioStreamSocket* m_pSocket;
@@ -170,7 +203,7 @@ class UserProperties;
                 try{
                     m_pSocket->protocolHandshake(useSystemConfig);
                 } catch (boost::system::system_error e) {
-                    status = handleError(CONN_HANDSHAKE_FAILED, e.what());
+                    status = HandleProtocolHandshakeException(e.what());
                 }
                 return status;
             }
@@ -199,6 +232,29 @@ class UserProperties;
                 :Channel(ioService, host, port){
             }
             connectionStatus_t init();
+        protected:
+            /// @brief Handle protocol handshake exceptions for SSL specific failures.
+            /// 
+            /// @param in_errmsg                The error message.
+            /// 
+            /// @return the connectionStatus.
+            connectionStatus_t HandleProtocolHandshakeException(const char* errmsg) {
+                if (!(((SSLChannelContext_t *)m_pContext)->GetCertificateHostnameVerificationStatus())){
+                    return handleError(
+                        CONN_HANDSHAKE_FAILED,
+                        getMessage(ERR_CONN_SSL_CN));
+                }
+                else if (0 == strcmp(errmsg, DRILL_BOOST_SSL_CERT_VERIFY_FAILED)){
+                    return handleError(
+                        CONN_HANDSHAKE_FAILED,
+                        getMessage(ERR_CONN_SSL_CERTVERIFY, errmsg));
+                }
+                else{
+                    return handleError(
+                        CONN_HANDSHAKE_FAILED,
+                        getMessage(ERR_CONN_SSL_GENERAL, errmsg));
+                }
+            }
     };
 
     class ChannelFactory{
@@ -215,6 +271,48 @@ class UserProperties;
             static ChannelContext_t* getChannelContext(channelType_t t, DrillUserProperties* props);
     };
 
+    /// @brief Hostname verification callback wrapper.
+    class DrillSSLHostnameVerifier{
+        public:
+            /// @brief The constructor.
+            /// 
+            /// @param in_pctx                  The SSL Channel Context.
+            /// @param in_verifier              The wrapped verifier.
+            DrillSSLHostnameVerifier(SSLChannelContext_t* in_pctx, boost::asio::ssl::rfc2818_verification in_verifier) : 
+                m_verifier(in_verifier),
+                m_pctx(in_pctx){
+                DRILL_LOG(LOG_INFO)
+                    << "DrillSSLHostnameVerifier::DrillSSLHostnameVerifier: +++++ Enter +++++" 
+                    << std::endl;
+            }
+
+            /// @brief Perform certificate verification.
+            /// 
+            /// @param in_preverified           Pre-verified indicator.
+            /// @param in_ctx                   Verify context.
+            bool operator()(
+                bool in_preverified,
+                boost::asio::ssl::verify_context& in_ctx){
+                DRILL_LOG(LOG_INFO) << "DrillSSLHostnameVerifier::operator(): +++++ Enter +++++" << std::endl;
+
+                bool verified = m_verifier(in_preverified, in_ctx);
+
+                DRILL_LOG(LOG_DEBUG) 
+                    << "DrillSSLHostnameVerifier::operator(): Verification Result: " 
+                    << verified 
+                    << std::endl;
+
+                m_pctx->SetCertHostnameVerificationStatus(verified);
+                return verified;
+            }
+
+        private:
+            // The inner verifier.
+            boost::asio::ssl::rfc2818_verification m_verifier;
+
+            // The SSL channel context.
+            SSLChannelContext_t* m_pctx;
+    };
 
 } // namespace Drill
 
diff --git a/contrib/native/client/src/clientlib/errmsgs.cpp b/contrib/native/client/src/clientlib/errmsgs.cpp
index c1ac80d..37f0ac1 100644
--- a/contrib/native/client/src/clientlib/errmsgs.cpp
+++ b/contrib/native/client/src/clientlib/errmsgs.cpp
@@ -57,6 +57,9 @@ static Drill::ErrorMessages errorMessages[]={
     {ERR_CONN_NOSERVERENC, ERR_CATEGORY_CONN, 0, "Client needs encryption but encryption is disabled on the server."
         " Please check connection parameters or contact administrator. [Warn: This"
         " could be due to a bad configuration or a security attack is in progress.]"},
+    {ERR_CONN_SSL_GENERAL, ERR_CATEGORY_CONN, 0, "Encountered an exception during SSL handshake. [Details: %s]"},
+    {ERR_CONN_SSL_CN, ERR_CATEGORY_CONN, 0, "SSL certificate host name verification failure." },
+    {ERR_CONN_SSL_CERTVERIFY, ERR_CATEGORY_CONN, 0, "SSL certificate verification failed. [Details: %s]"},
     {ERR_QRY_OUTOFMEM, ERR_CATEGORY_QRY, 0, "Out of memory."},
     {ERR_QRY_COMMERR, ERR_CATEGORY_QRY, 0, "Communication error. %s"},
     {ERR_QRY_INVREADLEN, ERR_CATEGORY_QRY, 0, "Internal Error: Received a message with an invalid read length."},
diff --git a/contrib/native/client/src/clientlib/errmsgs.hpp b/contrib/native/client/src/clientlib/errmsgs.hpp
index fac646b..7bcb805 100644
--- a/contrib/native/client/src/clientlib/errmsgs.hpp
+++ b/contrib/native/client/src/clientlib/errmsgs.hpp
@@ -55,7 +55,10 @@ namespace Drill{
 #define ERR_CONN_NOSOCKET       DRILL_ERR_START+23
 #define ERR_CONN_NOSERVERAUTH   DRILL_ERR_START+24
 #define ERR_CONN_NOSERVERENC    DRILL_ERR_START+25
-#define ERR_CONN_MAX            DRILL_ERR_START+25
+#define ERR_CONN_SSL_GENERAL    DRILL_ERR_START+26
+#define ERR_CONN_SSL_CN         DRILL_ERR_START+27
+#define ERR_CONN_SSL_CERTVERIFY DRILL_ERR_START+28
+#define ERR_CONN_MAX            DRILL_ERR_START+28
 
 #define ERR_QRY_OUTOFMEM    ERR_CONN_MAX+1
 #define ERR_QRY_COMMERR     ERR_CONN_MAX+2
diff --git a/contrib/native/client/src/include/drill/userProperties.hpp b/contrib/native/client/src/include/drill/userProperties.hpp
index f5d6783..fb6c764 100644
--- a/contrib/native/client/src/include/drill/userProperties.hpp
+++ b/contrib/native/client/src/include/drill/userProperties.hpp
@@ -29,8 +29,7 @@ class DECLSPEC_DRILL_CLIENT DrillUserProperties{
 
         DrillUserProperties(){};
         
-        /// @brief Update the property value associate with the property key if the value is 
-        /// empty.
+        /// @brief Sets the default property value.
         /// 
         /// @param in_propName              The property name.
         /// @param in_propValue             The property value.


[drill] 07/13: DRILL-6592: Unnest record batch size is called too frequently

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit cad9aad12ff18a9315b8cce971e27c1b32c48079
Author: Parth Chandra <pa...@apache.org>
AuthorDate: Fri Jul 6 16:23:51 2018 -0700

    DRILL-6592: Unnest record batch size is called too frequently
    
    closes #1376
---
 .../exec/physical/impl/unnest/UnnestRecordBatch.java | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 3ef547c..9c1e702 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -49,6 +49,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
 
   private Unnest unnest;
+  private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was
+                                        // sent. The next iteration, we need to make sure the record batch sizer
+                                        // is updated before we process the actual data.
   private boolean hasRemainder = false; // set to true if there is data left over for the current row AND if we want
                                         // to keep processing it. Kill may be called by a limit in a subquery that
                                         // requires us to stop processing thecurrent row, but not stop processing
@@ -180,6 +183,12 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       return nextState;
     }
 
+    if (hasNewSchema) {
+      memoryManager.update();
+      hasNewSchema = false;
+      return doWork();
+    }
+
     if (hasRemainder) {
       return doWork();
     }
@@ -194,7 +203,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       state = BatchState.NOT_FIRST;
       try {
         stats.startSetup();
-        hasRemainder = true; // next call to next will handle the actual data.
+        hasNewSchema = true; // next call to next will handle the actual data.
         logger.debug("First batch received");
         schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the
                          // current field metadata for check in subsequent iterations
@@ -213,10 +222,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       container.zeroVectors();
       // Check if schema has changed
       if (lateral.getRecordIndex() == 0) {
-        boolean isNewSchema = schemaChanged();
-        stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
-        if (isNewSchema) {
-          hasRemainder = true;     // next call to next will handle the actual data.
+        hasNewSchema = schemaChanged();
+        stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
+        if (hasNewSchema) {
           try {
             setupNewSchema();
           } catch (SchemaChangeException ex) {
@@ -229,6 +237,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
         }
         // else
         unnest.resetGroupIndex();
+        memoryManager.update();
       }
       return doWork();
     }
@@ -265,7 +274,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
 
   protected IterOutcome doWork() {
     Preconditions.checkNotNull(lateral);
-    memoryManager.update();
     unnest.setOutputCount(memoryManager.getOutputRowCount());
     final int incomingRecordCount = incoming.getRecordCount();
     final int currentRecord = lateral.getRecordIndex();


[drill] 03/13: DRILL-6560: Enhanced the batch statistics logging enablement

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 80fb761f2cef0dc78c573081aaf83e52b32b46fc
Author: Salim Achouche <sa...@gmail.com>
AuthorDate: Fri Jun 29 17:31:40 2018 -0700

    DRILL-6560: Enhanced the batch statistics logging enablement
    
    closes #1355
---
 .../java/org/apache/drill/exec/ExecConstants.java  |   3 +
 .../apache/drill/exec/physical/impl/ScanBatch.java |  11 +-
 .../exec/server/options/SystemOptionManager.java   |   5 +-
 .../parquet/columnreaders/ParquetRecordReader.java |   3 +-
 .../parquet/columnreaders/VarLenBinaryReader.java  |  15 +-
 .../batchsizing/OverflowSerDeUtil.java             |  12 +-
 .../batchsizing/RecordBatchOverflow.java           |  17 +-
 .../batchsizing/RecordBatchSizerManager.java       |  45 +++--
 .../drill/exec/util/record/RecordBatchStats.java   | 181 ++++++++++++++-------
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 10 files changed, 186 insertions(+), 107 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 4c840a4..d0842d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -711,5 +711,8 @@ public final class ExecConstants {
   public static final String STATS_LOGGING_FG_BATCH_SIZE_OPTION = "drill.exec.stats.logging.fine_grained.batch_size";
   public static final BooleanValidator STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR = new BooleanValidator(STATS_LOGGING_FG_BATCH_SIZE_OPTION);
 
+  /** Controls the list of operators for which batch sizing stats should be enabled */
+  public static final String STATS_LOGGING_BATCH_OPERATOR_OPTION = "drill.exec.stats.logging.enabled_operators";
+  public static final StringValidator STATS_LOGGING_BATCH_OPERATOR_VALIDATOR = new StringValidator(STATS_LOGGING_BATCH_OPERATOR_OPTION);
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 09e785e..4a2cd2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -82,7 +82,7 @@ public class ScanBatch implements CloseableRecordBatch {
   private final BufferAllocator allocator;
   private final List<Map<String, String>> implicitColumnList;
   private String currentReaderClassName;
-  private final RecordBatchStatsContext batchStatsLogging;
+  private final RecordBatchStatsContext batchStatsContext;
 
   /**
    *
@@ -121,7 +121,7 @@ public class ScanBatch implements CloseableRecordBatch {
       this.implicitColumnList = implicitColumnList;
       addImplicitVectors();
       currentReader = null;
-      batchStatsLogging = new RecordBatchStatsContext(context, oContext);
+      batchStatsContext = new RecordBatchStatsContext(context, oContext);
     } finally {
       oContext.getStats().stopProcessing();
     }
@@ -304,12 +304,7 @@ public class ScanBatch implements CloseableRecordBatch {
       return; // NOOP
     }
 
-    RecordBatchStats.logRecordBatchStats(
-      batchStatsLogging.getContextOperatorId(),
-      getFQNForLogging(MAX_FQN_LENGTH),
-      this,
-      batchStatsLogging,
-      logger);
+    RecordBatchStats.logRecordBatchStats(getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext);
   }
 
   /** Might truncate the FQN if too long */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a16bb4d..5ee3825 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -46,8 +46,8 @@ import com.google.common.collect.Lists;
 
 /**
  *  <p> {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}.
- *  Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
- *  persist between restarts.
+ * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
+ * persist between restarts.
  *  </p>
  *
  *  <p> All the system options are externalized into conf file. While adding a new system option
@@ -235,6 +235,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_OPERATOR_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
     };
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index e1ca73f..e33a505 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.parquet.ParquetReaderStats;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -240,7 +241,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
   public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
     this.operatorContext = operatorContext;
     schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns());
-    batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead);
+    batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead, new RecordBatchStatsContext(fragmentContext, operatorContext));
 
     logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
         hadoopPath.toUri().getPath());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index 7bdc33e..1fb224d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -34,11 +34,11 @@ import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatch
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowState;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.VarLenColumnBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.vector.ValueVector;
 
 /** Class which handles reading a batch of rows from a set of variable columns */
 public class VarLenBinaryReader {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLenBinaryReader.class);
 
   final ParquetRecordReader parentReader;
   final RecordBatchSizerManager batchSizer;
@@ -170,7 +170,8 @@ public class VarLenBinaryReader {
 
         // Lazy initialization
         if (builder == null) {
-          builder = RecordBatchOverflow.newBuilder(parentReader.getOperatorContext().getAllocator());
+          builder = RecordBatchOverflow.newBuilder(parentReader.getOperatorContext().getAllocator(),
+            batchSizer.getBatchStatsContext());
         }
 
         final int numOverflowValues = columnStat.numValuesRead - batchNumRecords;
@@ -181,7 +182,7 @@ public class VarLenBinaryReader {
     // Register batch overflow data with the record batch sizer manager (if any)
     if (builder != null) {
       Map<String, FieldOverflowStateContainer> overflowContainerMap = parentReader.batchSizerMgr.getFieldOverflowMap();
-      Map<String, FieldOverflowDefinition> overflowDefMap           = builder.build().getRecordOverflowDefinition().getFieldOverflowDefs();
+      Map<String, FieldOverflowDefinition> overflowDefMap = builder.build().getRecordOverflowDefinition().getFieldOverflowDefs();
 
       for (Map.Entry<String, FieldOverflowDefinition> entry : overflowDefMap.entrySet()) {
         FieldOverflowStateContainer overflowStateContainer = new FieldOverflowStateContainer(entry.getValue(), null);
@@ -197,9 +198,9 @@ public class VarLenBinaryReader {
     // Finally, re-order the variable length columns since an overflow occurred
     Collections.sort(orderedColumns, comparator);
 
-    if (logger.isDebugEnabled()) {
-      boolean isFirstValue    = true;
-      final StringBuilder msg = new StringBuilder(RecordBatchSizerManager.BATCH_STATS_PREFIX);
+    if (batchSizer.getBatchStatsContext().isEnableBatchSzLogging()) {
+      boolean isFirstValue = true;
+      final StringBuilder msg = new StringBuilder();
       msg.append(": Dumping the variable length columns read order: ");
 
       for (VLColumnContainer container : orderedColumns) {
@@ -212,7 +213,7 @@ public class VarLenBinaryReader {
       }
       msg.append('.');
 
-      logger.debug(msg.toString());
+      RecordBatchStats.logRecordBatchStats(msg.toString(), batchSizer.getBatchStatsContext());
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
index c542803..4a0e1e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
@@ -26,6 +26,8 @@ import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatch
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowEntry;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowContainer;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowDefinition;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.UInt1Vector;
 import org.apache.drill.exec.vector.UInt4Vector;
 
@@ -44,7 +46,6 @@ import org.apache.drill.exec.vector.UInt4Vector;
  * </ul>
  */
 final class OverflowSerDeUtil {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OverflowSerDeUtil.class);
 
   /**
    * Serializes a collection of overflow fields into a memory buffer:
@@ -56,10 +57,12 @@ final class OverflowSerDeUtil {
    *
    * @param fieldOverflowEntries input collection of field overflow entries
    * @param allocator buffer allocator
+   * @param batchStatsContext batch statistics context object
    * @return record overflow container; null if the input buffer is empty
    */
   static RecordOverflowContainer serialize(List<FieldOverflowEntry> fieldOverflowEntries,
-    BufferAllocator allocator) {
+    BufferAllocator allocator,
+    RecordBatchStatsContext batchStatsContext) {
 
     if (fieldOverflowEntries == null || fieldOverflowEntries.isEmpty()) {
       return null;
@@ -82,8 +85,9 @@ final class OverflowSerDeUtil {
     // Allocate the required memory to serialize the overflow fields
     final DrillBuf buffer = allocator.buffer(bufferLength);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug(String.format("Allocated a buffer of length %d to handle overflow", bufferLength));
+    if (batchStatsContext.isEnableBatchSzLogging()) {
+      final String msg = String.format("Allocated a buffer of length [%d] to handle overflow", bufferLength);
+      RecordBatchStats.logRecordBatchStats(msg, batchStatsContext);
     }
 
     // Create the result object
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
index 76422ae..462ddf0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 
@@ -39,10 +40,11 @@ public final class RecordBatchOverflow {
 
   /**
    * @param allocator buffer allocator
+   * @param batchStatsContext batch statistics context
    * @return new builder object
    */
-  public static Builder newBuilder(BufferAllocator allocator) {
-    return new Builder(allocator);
+  public static Builder newBuilder(BufferAllocator allocator, RecordBatchStatsContext batchStatsContext) {
+    return new Builder(allocator, batchStatsContext);
   }
 
   /**
@@ -75,13 +77,17 @@ public final class RecordBatchOverflow {
     private final List<FieldOverflowEntry> fieldOverflowEntries = new ArrayList<FieldOverflowEntry>();
     /** Buffer allocator */
     private final BufferAllocator allocator;
+    /** Batch statistics context */
+    private final RecordBatchStatsContext batchStatsContext;
 
     /**
      * Build class to construct a {@link RecordBatchOverflow} object.
      * @param allocator buffer allocator
+     * @param batchStatsContext batch statistics context
      */
-    private Builder(BufferAllocator allocator) {
+    private Builder(BufferAllocator allocator, RecordBatchStatsContext batchStatsContext) {
       this.allocator = allocator;
+      this.batchStatsContext = batchStatsContext;
     }
 
     /**
@@ -101,9 +107,8 @@ public final class RecordBatchOverflow {
      * @return a new built {link BatchRecordOverflow} object instance
      */
     public RecordBatchOverflow build() {
-      RecordOverflowContainer overflowContainer = OverflowSerDeUtil.serialize(fieldOverflowEntries, allocator);
-      RecordBatchOverflow result                =
-        new RecordBatchOverflow(overflowContainer.recordOverflowDef, allocator);
+      RecordOverflowContainer overflowContainer = OverflowSerDeUtil.serialize(fieldOverflowEntries, allocator, batchStatsContext);
+      RecordBatchOverflow result = new RecordBatchOverflow(overflowContainer.recordOverflowDef, allocator);
 
       return result;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
index 01644f7..5ddcf7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
@@ -30,6 +30,8 @@ import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetSchema;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -39,7 +41,7 @@ import org.apache.drill.exec.vector.ValueVector;
  */
 public final class RecordBatchSizerManager {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSizerManager.class);
-  public static final String BATCH_STATS_PREFIX = "BATCH_STATS";
+
 
   /** Minimum column memory size */
   private static final int MIN_COLUMN_MEMORY_SZ = VarLenColumnBulkInput.getMinVLColumnMemorySize();
@@ -78,6 +80,9 @@ public final class RecordBatchSizerManager {
    */
   private Map<String, FieldOverflowStateContainer> fieldOverflowMap = CaseInsensitiveMap.newHashMap();
 
+  /** For controlling batch statistics logging */
+  private final RecordBatchStatsContext batchStatsContext;
+
   /**
    * Constructor.
    *
@@ -87,7 +92,8 @@ public final class RecordBatchSizerManager {
    */
   public RecordBatchSizerManager(OptionManager options,
     ParquetSchema schema,
-    long totalRecordsToRead) {
+    long totalRecordsToRead,
+    RecordBatchStatsContext batchStatsContext) {
 
     this.schema = schema;
     this.totalRecordsToRead = totalRecordsToRead;
@@ -97,6 +103,7 @@ public final class RecordBatchSizerManager {
     this.maxRecordsPerBatch = this.configRecordsPerBatch;
     this.recordsPerBatch = this.configRecordsPerBatch;
     this.overflowOptimizer = new BatchOverflowOptimizer(columnMemoryInfoMap);
+    this.batchStatsContext = batchStatsContext;
   }
 
   /**
@@ -131,6 +138,13 @@ public final class RecordBatchSizerManager {
   }
 
   /**
+   * @return batch statistics context
+   */
+  public RecordBatchStatsContext getBatchStatsContext() {
+    return batchStatsContext;
+  }
+
+  /**
    * Allocates value vectors for the current batch.
    *
    * @param vectorMap a collection of value vectors keyed by their field names
@@ -282,10 +296,9 @@ public final class RecordBatchSizerManager {
       normalizedNumRecords = (int) totalRecordsToRead;
     }
 
-    if (logger.isDebugEnabled()) {
-      final String message = String.format("%s: The Parquet reader number of record(s) has been set to [%d]",
-        BATCH_STATS_PREFIX, normalizedNumRecords);
-      logger.debug(message);
+    if (batchStatsContext.isEnableBatchSzLogging()) {
+      final String message = String.format("The Parquet reader number of record(s) has been set to [%d]", normalizedNumRecords);
+      RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
     }
 
     return normalizedNumRecords;
@@ -319,10 +332,9 @@ public final class RecordBatchSizerManager {
       logger.warn(message);
     }
 
-    if (logger.isDebugEnabled()) {
-      final String message = String.format("%s: The Parquet reader batch memory has been set to [%d] byte(s)",
-        BATCH_STATS_PREFIX, normalizedMemorySize);
-      logger.debug(message);
+    if (batchStatsContext.isEnableBatchSzLogging()) {
+      final String message = String.format("The Parquet reader batch memory has been set to [%d] byte(s)", normalizedMemorySize);
+      RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
     }
 
     return normalizedMemorySize;
@@ -370,13 +382,12 @@ public final class RecordBatchSizerManager {
     assignFineGrainedMemoryQuota();
 
     // log the new record batch if it changed
-    if (logger.isDebugEnabled()) {
+    if (batchStatsContext.isEnableBatchSzLogging()) {
       assert recordsPerBatch <= maxRecordsPerBatch;
 
       if (originalRecordsPerBatch != recordsPerBatch) {
-        final String message = String.format("%s: The Parquet records per batch [%d] has been decreased to [%d]",
-          BATCH_STATS_PREFIX, originalRecordsPerBatch, recordsPerBatch);
-        logger.debug(message);
+        final String message = String.format("The Parquet records per batch [%d] has been decreased to [%d]", originalRecordsPerBatch, recordsPerBatch);
+        RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
       }
 
       // Now dump the per column memory quotas
@@ -504,12 +515,12 @@ public final class RecordBatchSizerManager {
   }
 
   private void dumpColumnMemoryQuotas() {
-    StringBuilder msg = new StringBuilder(BATCH_STATS_PREFIX);
+    StringBuilder msg = new StringBuilder();
     msg.append(": Field Quotas:\n\tName\tType\tPrec\tQuota\n");
 
     for (ColumnMemoryInfo columnInfo : columnMemoryInfoMap.values()) {
       msg.append("\t");
-      msg.append(BATCH_STATS_PREFIX);
+      msg.append(RecordBatchStats.BATCH_STATS_PREFIX);
       msg.append("\t");
       msg.append(columnInfo.columnMeta.getField().getName());
       msg.append("\t");
@@ -521,7 +532,7 @@ public final class RecordBatchSizerManager {
       msg.append("\n");
     }
 
-    logger.debug(msg.toString());
+    RecordBatchStats.logRecordBatchStats(msg.toString(), batchStatsContext);
   }
 
   private  static void printType(MaterializedField field, StringBuilder msg) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
index 8b213a8..0b24244 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.util.record;
 
-import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -25,7 +24,6 @@ import org.apache.drill.exec.ops.FragmentContextImpl;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.RecordBatchSizer.ColumnSize;
@@ -34,13 +32,14 @@ import org.apache.drill.exec.record.RecordBatchSizer.ColumnSize;
  * Utility class to capture key record batch statistics.
  */
 public final class RecordBatchStats {
+  // Logger
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchStats.class);
+
   /** A prefix for all batch stats to simplify search */
   public static final String BATCH_STATS_PREFIX = "BATCH_STATS";
 
   /** Helper class which loads contextual record batch logging options */
   public static final class RecordBatchStatsContext {
-    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchStatsContext.class);
-
     /** batch size logging for all readers */
     private final boolean enableBatchSzLogging;
     /** Fine grained batch size logging */
@@ -52,8 +51,17 @@ public final class RecordBatchStats {
      * @param options options manager
      */
     public RecordBatchStatsContext(FragmentContext context, OperatorContext oContext) {
-      enableBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_BATCH_SIZE_OPTION);
-      enableFgBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_FG_BATCH_SIZE_OPTION);
+      final boolean operatorEnabledForStatsLogging = isBatchStatsEnabledForOperator(context, oContext);
+
+      if (operatorEnabledForStatsLogging) {
+        enableBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_BATCH_SIZE_OPTION);
+        enableFgBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_FG_BATCH_SIZE_OPTION);
+
+      } else {
+        enableBatchSzLogging = false;
+        enableFgBatchSzLogging = false;
+      }
+
       contextOperatorId = new StringBuilder()
         .append(getQueryId(context))
         .append(":")
@@ -100,6 +108,104 @@ public final class RecordBatchStats {
       }
       return "NA";
     }
+
+    private boolean isBatchStatsEnabledForOperator(FragmentContext context, OperatorContext oContext) {
+      // The configuration can select what operators should log batch statistics
+      final String statsLoggingOperator = context.getOptions().getString(ExecConstants.STATS_LOGGING_BATCH_OPERATOR_OPTION).toUpperCase();
+      final String allOperatorsStr = "ALL";
+
+      // All operators are allowed to log batch statistics
+      if (allOperatorsStr.equals(statsLoggingOperator)) {
+        return true;
+      }
+
+      // No, only a select few are allowed; syntax: operator-id-1,operator-id-2,..
+      final String[] operators = statsLoggingOperator.split(",");
+      final String operatorId = oContext.getStats().getId().toUpperCase();
+
+      for (int idx = 0; idx < operators.length; idx++) {
+        // We use "contains" because the operator identifier is a composite string; e.g., 3:[PARQUET_ROW_GROUP_SCAN]
+        if (operatorId.contains(operators[idx].trim())) {
+          return true;
+        }
+      }
+
+      return false;
+    }
+  }
+
+  /**
+   * @see {@link RecordBatchStats#logRecordBatchStats(String, RecordBatch, RecordBatchStatsContext)}
+   */
+  public static void logRecordBatchStats(RecordBatch recordBatch,
+    RecordBatchStatsContext batchStatsContext) {
+
+    logRecordBatchStats(null, recordBatch, batchStatsContext);
+  }
+
+  /**
+   * Logs record batch statistics for the input record batch (logging happens only
+   * when record statistics logging is enabled).
+   *
+   * @param sourceId optional source identifier for scanners
+   * @param recordBatch a set of records
+   * @param batchStatsContext batch stats context object
+   */
+  public static void logRecordBatchStats(String sourceId,
+    RecordBatch recordBatch,
+    RecordBatchStatsContext batchStatsContext) {
+
+    if (!batchStatsContext.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    final String statsId = batchStatsContext.getContextOperatorId();
+    final boolean verbose = batchStatsContext.isEnableFgBatchSzLogging();
+    final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose);
+
+    logBatchStatsMsg(batchStatsContext, msg, false);
+  }
+
+  /**
+   * Logs a generic batch statistics message
+   *
+   * @param message log message
+   * @param batchStatsLogging
+   * @param batchStatsContext batch stats context object
+   */
+  public static void logRecordBatchStats(String message,
+    RecordBatchStatsContext batchStatsContext) {
+
+    if (!batchStatsContext.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    logBatchStatsMsg(batchStatsContext, message, true);
+  }
+
+  /**
+   * @param allocator dumps allocator statistics
+   * @return string with allocator statistics
+   */
+  public static String printAllocatorStats(BufferAllocator allocator) {
+    StringBuilder msg = new StringBuilder();
+    msg.append(BATCH_STATS_PREFIX);
+    msg.append(": dumping allocator statistics:\n");
+    msg.append(BATCH_STATS_PREFIX);
+    msg.append(": ");
+    msg.append(allocator.toString());
+
+    return msg.toString();
+  }
+
+// ----------------------------------------------------------------------------
+// Local Implementation
+// ----------------------------------------------------------------------------
+
+  /**
+   * Disabling class object instantiation.
+   */
+  private RecordBatchStats() {
   }
 
   /**
@@ -112,7 +218,7 @@ public final class RecordBatchStats {
    *
    * @return a string containing the record batch statistics
    */
-  public static String printRecordBatchStats(String statsId,
+  private static String printRecordBatchStats(String statsId,
     String sourceId,
     RecordBatch recordBatch,
     boolean verbose) {
@@ -158,68 +264,19 @@ public final class RecordBatchStats {
     return msg.toString();
   }
 
-  /**
-   * Logs record batch statistics for the input record batch (logging happens only
-   * when record statistics logging is enabled).
-   *
-   * @param stats instance identifier
-   * @param sourceId optional source identifier for scanners
-   * @param recordBatch a set of records
-   * @param verbose whether to include fine-grained stats
-   * @param logger Logger where to print the record batch statistics
-   */
-  public static void logRecordBatchStats(String statsId,
-    String sourceId,
-    RecordBatch recordBatch,
-    RecordBatchStatsContext batchStatsLogging,
-    org.slf4j.Logger logger) {
+  private static void logBatchStatsMsg(RecordBatchStatsContext batchStatsContext,
+    String msg,
+    boolean includePrefix) {
 
-    if (!batchStatsLogging.isEnableBatchSzLogging()) {
-      return; // NOOP
+    if (includePrefix) {
+      msg = BATCH_STATS_PREFIX + '\t' + msg;
     }
 
-    final boolean verbose = batchStatsLogging.isEnableFgBatchSzLogging();
-    final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose);
-
-    if (batchStatsLogging.useInfoLevelLogging()) {
+    if (batchStatsContext.useInfoLevelLogging()) {
       logger.info(msg);
     } else {
       logger.debug(msg);
     }
   }
 
-  /**
-   * Prints a materialized field type
-   * @param field materialized field
-   * @param msg string builder where to append the field type
-   */
-  public static void printFieldType(MaterializedField field, StringBuilder msg) {
-    final MajorType type = field.getType();
-
-    msg.append(type.getMinorType().name());
-    msg.append(':');
-    msg.append(type.getMode().name());
-  }
-
-  /**
-   * @param allocator dumps allocator statistics
-   * @return string with allocator statistics
-   */
-  public static String printAllocatorStats(BufferAllocator allocator) {
-    StringBuilder msg = new StringBuilder();
-    msg.append(BATCH_STATS_PREFIX);
-    msg.append(": dumping allocator statistics:\n");
-    msg.append(BATCH_STATS_PREFIX);
-    msg.append(": ");
-    msg.append(allocator.toString());
-
-    return msg.toString();
-  }
-
-  /**
-   * Disabling class object instantiation.
-   */
-  private RecordBatchStats() {
-  }
-
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index b0cc209..19e779d 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -488,6 +488,7 @@ drill.exec.options: {
     exec.udf.use_dynamic: true,
     drill.exec.stats.logging.batch_size: false,
     drill.exec.stats.logging.fine_grained.batch_size: false,
+    drill.exec.stats.logging.enabled_operators: all,
     new_view_default_permissions: 700,
     org.apache.drill.exec.compile.ClassTransformer.scalar_replacement: "try",
     planner.add_producer_consumer: false,


[drill] 02/13: DRILL-6578: Handle query cancellation in Parquet reader

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit cfe61eb9b6ed4d636a26c76dbd12df26f38ba672
Author: Salim Achouche <sa...@gmail.com>
AuthorDate: Mon Jul 2 19:13:26 2018 -0700

    DRILL-6578: Handle query cancellation in Parquet reader
    
    closes #1360
---
 .../drill/common/exceptions/DrillRuntimeException.java | 18 ++++++++++++++++++
 .../main/codegen/templates/VariableLengthVectors.java  |  4 +++-
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java b/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
index 98b1a9d..b6ced84 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java
@@ -48,4 +48,22 @@ public class DrillRuntimeException extends RuntimeException {
   public static void format(Throwable cause, String format, Object...args) {
     throw new DrillRuntimeException(String.format(format, args), cause);
   }
+
+  /**
+   * This method can be called within loops to check whether the current thread has been
+   * interrupted; it ensures that operator implementation can respond to query cancellation
+   * in a timely manner.
+   *
+   * <p>Calling this method will result in the following behavior:
+   * <ul>
+   * <li>Throws a runtime exception if current thread interrupt flag has been set
+   * <li>Clears current thread interrupt flag
+   * </ul>
+   */
+  public static void checkInterrupted() {
+    if (Thread.interrupted()) {
+      // This exception will ensure the control layer will immediately get back control
+      throw new DrillRuntimeException("Interrupt received; aborting current operation");
+    }
+  }
 }
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index c35728e..8dd8eb1 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -19,7 +19,7 @@ import java.lang.Override;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Set;
-
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.vector.BaseDataValueVector;
@@ -641,6 +641,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
         if (callback != null) {
           callback.onNewBulkEntry(entry);
         }
+
+        DrillRuntimeException.checkInterrupted(); // Ensures fast handling of query cancellation
       }
 
       // Flush any data not yet copied to this VL container


[drill] 06/13: DRILL-6596: Fix fillEmpties and set methods for Nullable variable length vectors to not use emptyByteArray

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit c396ae71f36c3ac409d48b8977aac12d28f25077
Author: Parth Chandra <pa...@apache.org>
AuthorDate: Fri Jul 6 16:23:51 2018 -0700

    DRILL-6596: Fix fillEmpties and set methods for Nullable variable length
    vectors to not use emptyByteArray
    
    closes #1377
---
 .../codegen/templates/NullableValueVectors.java    | 22 ++++++----------------
 1 file changed, 6 insertions(+), 16 deletions(-)

diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index f30cfae..ff066fb 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -579,9 +579,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       final UInt1Vector.Mutator bitsMutator = bits.getMutator();
       <#if type.major == "VarLen">
-      for (int i = lastSet + 1; i < index; i++) {
-        valuesMutator.set(i, emptyByteArray);
-      }
+      valuesMutator.fillEmpties(lastSet, index);
       </#if>
       bitsMutator.set(index, 1);
       valuesMutator.set(index, value);
@@ -591,9 +589,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     <#if type.major == "VarLen">
     private void fillEmpties(int index) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
-      for (int i = lastSet; i < index; i++) {
-        valuesMutator.setSafe(i + 1, emptyByteArray);
-      }
+      valuesMutator.fillEmpties(lastSet, index+1);
       while(index > bits.getValueCapacity()) {
         bits.reAlloc();
       }
@@ -644,9 +640,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     public void set(int index, Nullable${minor.class}Holder holder) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
-      for (int i = lastSet + 1; i < index; i++) {
-        valuesMutator.set(i, emptyByteArray);
-      }
+      valuesMutator.fillEmpties(lastSet, index);
       </#if>
       bits.getMutator().set(index, holder.isSet);
       valuesMutator.set(index, holder);
@@ -656,9 +650,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     public void set(int index, ${minor.class}Holder holder) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
-      for (int i = lastSet + 1; i < index; i++) {
-        valuesMutator.set(i, emptyByteArray);
-      }
+      valuesMutator.fillEmpties(lastSet, index);
       </#if>
       bits.getMutator().set(index, 1);
       valuesMutator.set(index, holder);
@@ -673,9 +665,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     public void set(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
-      for (int i = lastSet + 1; i < index; i++) {
-        valuesMutator.set(i, emptyByteArray);
-      }
+      valuesMutator.fillEmpties(lastSet, index);
       </#if>
       bits.getMutator().set(index, isSet);
       valuesMutator.set(index<#list fields as field><#if field.include!true >, ${field.name}Field</#if></#list>);
@@ -887,4 +877,4 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   }
 }
 </#list>
-</#list>
\ No newline at end of file
+</#list>


[drill] 01/13: DRILL-6594: Data batches for Project operator are not being split properly and exceed the maximum specified

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit c64436774387e80fd9b0ff6e9cd7d42c9aa7a961
Author: karthik <km...@maprtech.com>
AuthorDate: Mon Jun 4 17:00:31 2018 -0700

    DRILL-6594: Data batches for Project operator are not being split properly and exceed the maximum specified
    
    This change fixes the incorrect accounting in the case where a columns is being projected more than once
    
    closes #1375
---
 .../impl/project/OutputWidthExpression.java        | 17 ++++----
 .../physical/impl/project/OutputWidthVisitor.java  |  2 +-
 .../impl/project/OutputWidthVisitorState.java      |  7 +---
 .../impl/project/ProjectMemoryManager.java         | 48 ++++++++++------------
 .../physical/impl/project/ProjectRecordBatch.java  | 22 +++++-----
 5 files changed, 42 insertions(+), 54 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
index b9240d6..84a3f46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthExpression.java
@@ -95,30 +95,31 @@ public abstract class OutputWidthExpression {
     }
 
     /**
-     * VarLenReadExpr captures the name of a variable length column that is used (read) in an expression.
-     * The captured name will be used to lookup the average entry size for the column in the corresponding
+     * VarLenReadExpr captures the inputColumnName and the readExpression used to read a variable length column.
+     * The captured inputColumnName will be used to lookup the average entry size for the column in the corresponding.
+     * If inputColumnName is null then the readExpression is used to get the name of the column.
      * {@link org.apache.drill.exec.record.RecordBatchSizer}
      */
     public static class VarLenReadExpr extends OutputWidthExpression  {
         ValueVectorReadExpression readExpression;
-        String name;
+        String inputColumnName;
 
         public VarLenReadExpr(ValueVectorReadExpression readExpression) {
             this.readExpression = readExpression;
-            this.name = null;
+            this.inputColumnName = null;
         }
 
-        public VarLenReadExpr(String name) {
+        public VarLenReadExpr(String inputColumnName) {
             this.readExpression = null;
-            this.name = name;
+            this.inputColumnName = inputColumnName;
         }
 
         public ValueVectorReadExpression getReadExpression() {
             return readExpression;
         }
 
-        public String getName() {
-            return name;
+        public String getInputColumnName() {
+            return inputColumnName;
         }
 
         @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
index cb58795..70908bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitor.java
@@ -205,7 +205,7 @@ public class OutputWidthVisitor extends AbstractExecExprVisitor<OutputWidthExpre
     @Override
     public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr varLenReadExpr, OutputWidthVisitorState state)
                                                         throws RuntimeException {
-        String columnName = varLenReadExpr.getName();
+        String columnName = varLenReadExpr.getInputColumnName();
         if (columnName == null) {
             TypedFieldId fieldId = varLenReadExpr.getReadExpression().getTypedFieldId();
             columnName =  TypedFieldId.getPath(fieldId, state.manager.getIncomingBatch());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
index c0e0cb1..e18c827 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/OutputWidthVisitorState.java
@@ -20,18 +20,13 @@ package org.apache.drill.exec.physical.impl.project;
 public class OutputWidthVisitorState {
 
     ProjectMemoryManager manager;
-    ProjectMemoryManager.OutputColumnType outputColumnType;
 
-    public OutputWidthVisitorState(ProjectMemoryManager manager, ProjectMemoryManager.OutputColumnType outputColumnType) {
+    public OutputWidthVisitorState(ProjectMemoryManager manager) {
         this.manager = manager;
-        this.outputColumnType = outputColumnType;
     }
 
     public ProjectMemoryManager getManager() {
         return manager;
     }
 
-    public ProjectMemoryManager.OutputColumnType getOutputColumnType() {
-        return outputColumnType;
-    }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index f461b09..03c849c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -88,15 +89,12 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
     }
 
     class ColumnWidthInfo {
-        //MaterializedField materializedField;
         OutputWidthExpression outputExpression;
         int width;
         WidthType widthType;
         OutputColumnType outputColumnType;
-        String name;
 
-        ColumnWidthInfo(ValueVector vv,
-                        OutputWidthExpression outputWidthExpression,
+        ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
                         OutputColumnType outputColumnType,
                         WidthType widthType,
                         int fieldWidth) {
@@ -104,8 +102,6 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
             this.width = fieldWidth;
             this.outputColumnType = outputColumnType;
             this.widthType = widthType;
-            String columnName = vv.getField().getName();
-            this.name = columnName;
         }
 
         public OutputWidthExpression getOutputExpression() { return outputExpression; }
@@ -116,7 +112,6 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
 
         public int getWidth() { return width; }
 
-        public String getName() { return name; }
     }
 
     void ShouldNotReachHere() {
@@ -180,43 +175,44 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
     }
 
 
-    void addTransferField(ValueVector vvOut, String path) {
-        addField(vvOut, null, OutputColumnType.TRANSFER, path);
+    void addTransferField(ValueVector vvIn, String inputColumnName, String outputColumnName) {
+        addField(vvIn, null, OutputColumnType.TRANSFER, inputColumnName, outputColumnName);
     }
 
-    void addNewField(ValueVector vv, LogicalExpression logicalExpression) {
-        addField(vv, logicalExpression, OutputColumnType.NEW, null);
+    void addNewField(ValueVector vvOut, LogicalExpression logicalExpression) {
+        addField(vvOut, logicalExpression, OutputColumnType.NEW, null, vvOut.getField().getName());
     }
 
-    void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType, String path) {
+    void addField(ValueVector vv, LogicalExpression logicalExpression, OutputColumnType outputColumnType,
+                  String inputColumnName, String outputColumnName) {
         if(isFixedWidth(vv)) {
             addFixedWidthField(vv);
         } else {
-            addVariableWidthField(vv, logicalExpression, outputColumnType, path);
+            addVariableWidthField(vv, logicalExpression, outputColumnType, inputColumnName, outputColumnName);
         }
     }
 
     private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpression,
-                                       OutputColumnType outputColumnType, String path) {
+                                       OutputColumnType outputColumnType, String inputColumnName, String outputColumnName) {
         variableWidthColumnCount++;
         ColumnWidthInfo columnWidthInfo;
         //Variable width transfers
         if(outputColumnType == OutputColumnType.TRANSFER) {
-            String columnName = path;
-            VarLenReadExpr readExpr = new VarLenReadExpr(columnName);
-            columnWidthInfo = new ColumnWidthInfo(vv, readExpr, outputColumnType,
+            VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
+            columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
                     WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the RecordBatchSizer
         } else if (isComplex(vv.getField().getType())) {
             addComplexField(vv);
             return;
         } else {
             // Walk the tree of LogicalExpressions to get a tree of OutputWidthExpressions
-            OutputWidthVisitorState state = new OutputWidthVisitorState(this, outputColumnType);
+            OutputWidthVisitorState state = new OutputWidthVisitorState(this);
             OutputWidthExpression outputWidthExpression = logicalExpression.accept(new OutputWidthVisitor(), state);
-            columnWidthInfo = new ColumnWidthInfo(vv, outputWidthExpression, outputColumnType,
+            columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, outputColumnType,
                     WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the OutputWidthExpression
         }
-        outputColumnSizes.put(columnWidthInfo.getName(), columnWidthInfo);
+        ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, columnWidthInfo);
+        Preconditions.checkState(existingInfo == null);
     }
 
     void addComplexField(ValueVector vv) {
@@ -258,8 +254,8 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
         setRecordBatchSizer(batchSizer);
         rowWidth = 0;
         int totalVariableColumnWidth = 0;
-        for (String expr : outputColumnSizes.keySet()) {
-            ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(expr);
+        for (String outputColumnName : outputColumnSizes.keySet()) {
+            ColumnWidthInfo columnWidthInfo = outputColumnSizes.get(outputColumnName);
             int width = -1;
             if (columnWidthInfo.isFixedWidth()) {
                 // fixed width columns are accumulated in totalFixedWidthColumnWidth
@@ -269,12 +265,10 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
                 //As the tree is walked, the RecordBatchSizer and function annotations
                 //are looked-up to come up with the final FixedLenExpr
                 OutputWidthExpression savedWidthExpr = columnWidthInfo.getOutputExpression();
-                OutputColumnType columnType = columnWidthInfo.getOutputColumnType();
-                OutputWidthVisitorState state = new OutputWidthVisitorState(this, columnType);
+                OutputWidthVisitorState state = new OutputWidthVisitorState(this);
                 OutputWidthExpression reducedExpr = savedWidthExpr.accept(new OutputWidthVisitor(), state);
-                assert reducedExpr instanceof FixedLenExpr;
                 width = ((FixedLenExpr)reducedExpr).getWidth();
-                assert width >= 0;
+                Preconditions.checkState(width >= 0);
             }
             totalVariableColumnWidth += width;
         }
@@ -301,7 +295,7 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
         logger.trace("update() : Output RC {}, BatchSizer RC {}, incoming RC {}, width {}, total fixed width {}"
                     + ", total variable width {}, total complex width {}, batchSizer time {} ms, update time {}  ms"
                     + ", manager {}, incoming {}",outPutRowCount, batchSizer.rowCount(), incomingBatch.getRecordCount(),
-                    totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
+                    rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
                     (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
 
         logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4bc63c0..dd93325 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -113,11 +113,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
   public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
-
-    // get the output batch size from config.
-    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-
-    memoryManager = new ProjectMemoryManager(configuredBatchSize);
   }
 
   @Override
@@ -367,6 +362,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
   private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException {
     long setupNewSchemaStartTime = System.currentTimeMillis();
+    // get the output batch size from config.
+    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    memoryManager = new ProjectMemoryManager(configuredBatchSize);
     memoryManager.init(incomingBatch, this);
     if (allocationVectors != null) {
       for (final ValueVector v : allocationVectors) {
@@ -431,7 +429,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
               final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(),
                 vvIn.getField().getType()), callBack);
               final TransferPair tp = vvIn.makeTransferPair(vvOut);
-              memoryManager.addTransferField(vvIn, vvIn.getField().getName());
+              memoryManager.addTransferField(vvIn, vvIn.getField().getName(), vvOut.getField().getName());
               transfers.add(tp);
             }
           } else if (value != null && value > 1) { // subsequent wildcards should do a copy of incoming valuevectors
@@ -513,7 +511,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
           container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
             vectorRead.getMajorType()), callBack);
         final TransferPair tp = vvIn.makeTransferPair(vvOut);
-        memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, incomingBatch));
+        memoryManager.addTransferField(vvIn, TypedFieldId.getPath(id, incomingBatch), vvOut.getField().getName());
         transfers.add(tp);
         transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
       } else if (expr instanceof DrillFuncHolderExpr &&
@@ -540,13 +538,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         memoryManager.addComplexField(null); // this will just add an estimate to the row width
       } else {
         // need to do evaluation.
-        final ValueVector vector = container.addOrGet(outputField, callBack);
-        allocationVectors.add(vector);
+        final ValueVector ouputVector = container.addOrGet(outputField, callBack);
+        allocationVectors.add(ouputVector);
         final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
-        final boolean useSetSafe = !(vector instanceof FixedWidthVector);
+        final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector);
         final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
         final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
-        memoryManager.addNewField(vector, write);
+        memoryManager.addNewField(ouputVector, write);
 
         // We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector.
         if (expr instanceof ValueVectorReadExpression) {
@@ -555,7 +553,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
             final TypedFieldId id = vectorRead.getFieldId();
             final ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(),
                     id.getFieldIds()).getValueVector();
-            vvIn.makeTransferPair(vector);
+            vvIn.makeTransferPair(ouputVector);
           }
         }
       }


[drill] 13/13: DRILL-6601 LageFileCompilation testProject times out

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit eb90ebdfd6e40fe1769bfedd320fabc1e57c723b
Author: karthik <km...@maprtech.com>
AuthorDate: Thu Jul 12 11:29:01 2018 -0700

    DRILL-6601 LageFileCompilation testProject times out
    
    closes #1378
---
 .../java/org/apache/drill/exec/compile/TestLargeFileCompilation.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index 86ef855..084107d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -51,7 +51,7 @@ public class TestLargeFileCompilation extends BaseTestQuery {
 
   private static final int NUM_PROJECT_COLUMNS = 2500;
 
-  private static final int NUM_PROJECT_TEST_COLUMNS = 10000;
+  private static final int NUM_PROJECT_TEST_COLUMNS = 5000;
 
   private static final int NUM_ORDERBY_COLUMNS = 500;
 


[drill] 11/13: [DRILL-6587] Added support for custom SSL CTX Options

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 94186fc54f2b2846955d44ced5ed06b1ae209884
Author: superbstreak <ro...@gmail.com>
AuthorDate: Mon Jul 9 12:07:22 2018 -0700

    [DRILL-6587] Added support for custom SSL CTX Options
    
    closes #1366
---
 contrib/native/client/example/querySubmitter.cpp   |  7 +++-
 contrib/native/client/src/clientlib/channel.cpp    | 14 +++++++-
 contrib/native/client/src/clientlib/channel.hpp    | 39 ++++++++++++----------
 contrib/native/client/src/clientlib/errmsgs.cpp    |  2 +-
 .../native/client/src/clientlib/userProperties.cpp |  1 +
 contrib/native/client/src/include/drill/common.hpp |  3 +-
 6 files changed, 45 insertions(+), 21 deletions(-)

diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 519dd93..a84d1db 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -54,7 +54,8 @@ struct Option{
     {"certFilePath", "Path to SSL certificate file", false},
     {"disableHostnameVerification", "disable host name verification", false},
     {"disableCertVerification", "disable certificate verification", false},
-    {"useSystemTrustStore", "[Windows only]. Use the system truststore.", false }
+    {"useSystemTrustStore", "[Windows only]. Use the system truststore.", false },
+    {"CustomSSLCtxOptions", "The custom SSL CTX Options", false}
 
 };
 
@@ -315,6 +316,7 @@ int main(int argc, char* argv[]) {
         std::string disableHostnameVerification=qsOptionValues["disableHostnameVerification"];
         std::string disableCertVerification=qsOptionValues["disableCertVerification"];
         std::string useSystemTrustStore = qsOptionValues["useSystemTrustStore"];
+        std::string customSSLOptions = qsOptionValues["CustomSSLCtxOptions"];
 
         Drill::QueryType type;
 
@@ -416,6 +418,9 @@ int main(int argc, char* argv[]) {
 			if (useSystemTrustStore.length() > 0){
 				props.setProperty(USERPROP_USESYSTEMTRUSTSTORE, useSystemTrustStore);
 			}
+            if (customSSLOptions.length() > 0){
+                props.setProperty(USERPROP_CUSTOM_SSLCTXOPTIONS, customSSLOptions);
+            }
         }
 
         if(client.connect(connectStr.c_str(), &props)!=Drill::CONN_SUCCESS){
diff --git a/contrib/native/client/src/clientlib/channel.cpp b/contrib/native/client/src/clientlib/channel.cpp
index e368cd0..bdc19f7 100644
--- a/contrib/native/client/src/clientlib/channel.cpp
+++ b/contrib/native/client/src/clientlib/channel.cpp
@@ -210,7 +210,19 @@ ChannelContext* ChannelFactory::getChannelContext(channelType_t t, DrillUserProp
                 verifyMode = boost::asio::ssl::context::verify_none;
             }
 
-            pChannelContext = new SSLChannelContext(props, tlsVersion, verifyMode);
+            long customSSLCtxOptions = 0;
+            std::string sslOptions;
+            props->getProp(USERPROP_CUSTOM_SSLCTXOPTIONS, sslOptions);
+            if (!sslOptions.empty()){
+                try{
+                    customSSLCtxOptions = boost::lexical_cast<long>(sslOptions);
+                }
+                catch (...){
+                      DRILL_LOG(LOG_ERROR) << "Unable to parse custom SSL CTX options." << std::endl;
+                 }
+            }
+
+            pChannelContext = new SSLChannelContext(props, tlsVersion, verifyMode, customSSLCtxOptions);
         }
             break;
 #endif
diff --git a/contrib/native/client/src/clientlib/channel.hpp b/contrib/native/client/src/clientlib/channel.hpp
index 76bedde..fec4659 100644
--- a/contrib/native/client/src/clientlib/channel.hpp
+++ b/contrib/native/client/src/clientlib/channel.hpp
@@ -23,11 +23,10 @@
 #include "streamSocket.hpp"
 #include "errmsgs.hpp"
 
-namespace
-{
-// The error message to indicate certificate verification failure.
-#define DRILL_BOOST_SSL_CERT_VERIFY_FAILED  "handshake: certificate verify failed\0"
-}
+#if defined(IS_SSL_ENABLED)
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+#endif
 
 namespace Drill {
 
@@ -90,7 +89,8 @@ class UserProperties;
 
         SSLChannelContext(DrillUserProperties *props,
                           boost::asio::ssl::context::method tlsVersion,
-                          boost::asio::ssl::verify_mode verifyMode) :
+                          boost::asio::ssl::verify_mode verifyMode,
+                          const long customSSLCtxOptions = 0) :
                     ChannelContext(props),
                     m_SSLContext(tlsVersion),
                     m_certHostnameVerificationStatus(true) 
@@ -101,6 +101,7 @@ class UserProperties;
                         | boost::asio::ssl::context::no_sslv2
                         | boost::asio::ssl::context::no_sslv3
                         | boost::asio::ssl::context::single_dh_use
+                        | customSSLCtxOptions
                         );
                 m_SSLContext.set_verify_mode(verifyMode);
             };
@@ -179,11 +180,11 @@ class UserProperties;
 
             /// @brief Handle protocol handshake exceptions.
             /// 
-            /// @param in_errmsg                The error message.
+            /// @param in_err                   The error.
             /// 
             /// @return the connectionStatus.
-            virtual connectionStatus_t HandleProtocolHandshakeException(const char* in_errmsg){
-                return handleError(CONN_HANDSHAKE_FAILED, in_errmsg);
+            virtual connectionStatus_t HandleProtocolHandshakeException(const boost::system::system_error& in_err){
+                return handleError(CONN_HANDSHAKE_FAILED, in_err.what());
             }
 
             boost::asio::io_service& m_ioService;
@@ -206,7 +207,7 @@ class UserProperties;
                 try{
                     m_pSocket->protocolHandshake(useSystemConfig);
                 } catch (boost::system::system_error e) {
-                    status = HandleProtocolHandshakeException(e.what());
+                    status = HandleProtocolHandshakeException(e);
                 }
                 return status;
             }
@@ -236,28 +237,32 @@ class UserProperties;
             }
             connectionStatus_t init();
         protected:
+#if defined(IS_SSL_ENABLED)
             /// @brief Handle protocol handshake exceptions for SSL specific failures.
             /// 
-            /// @param in_errmsg                The error message.
+            /// @param in_err               The error.
             /// 
             /// @return the connectionStatus.
-            connectionStatus_t HandleProtocolHandshakeException(const char* errmsg) {
+            connectionStatus_t HandleProtocolHandshakeException(const boost::system::system_error& in_err) {
+                const boost::system::error_code& errcode = in_err.code();
                 if (!(((SSLChannelContext_t *)m_pContext)->GetCertificateHostnameVerificationStatus())){
                     return handleError(
                         CONN_HANDSHAKE_FAILED,
-                        getMessage(ERR_CONN_SSL_CN));
+                        getMessage(ERR_CONN_SSL_CN, in_err.what()));
                 }
-                else if (0 == strcmp(errmsg, DRILL_BOOST_SSL_CERT_VERIFY_FAILED)){
+                else if (boost::asio::error::get_ssl_category() == errcode.category() && 
+                    SSL_R_CERTIFICATE_VERIFY_FAILED == ERR_GET_REASON(errcode.value())){
                     return handleError(
                         CONN_HANDSHAKE_FAILED,
-                        getMessage(ERR_CONN_SSL_CERTVERIFY, errmsg));
+                        getMessage(ERR_CONN_SSL_CERTVERIFY, in_err.what()));
                 }
                 else{
                     return handleError(
                         CONN_HANDSHAKE_FAILED,
-                        getMessage(ERR_CONN_SSL_GENERAL, errmsg));
+                        getMessage(ERR_CONN_SSL_GENERAL, in_err.what()));
                 }
             }
+#endif
     };
 
     class ChannelFactory{
@@ -312,7 +317,7 @@ class UserProperties;
 
                 // Sets the result back to the context.
                 context->SetCertHostnameVerificationStatus(verified);
-                return verified && in_preverified;
+                return verified;
             }
 
         private:
diff --git a/contrib/native/client/src/clientlib/errmsgs.cpp b/contrib/native/client/src/clientlib/errmsgs.cpp
index 37f0ac1..82f24fd 100644
--- a/contrib/native/client/src/clientlib/errmsgs.cpp
+++ b/contrib/native/client/src/clientlib/errmsgs.cpp
@@ -58,7 +58,7 @@ static Drill::ErrorMessages errorMessages[]={
         " Please check connection parameters or contact administrator. [Warn: This"
         " could be due to a bad configuration or a security attack is in progress.]"},
     {ERR_CONN_SSL_GENERAL, ERR_CATEGORY_CONN, 0, "Encountered an exception during SSL handshake. [Details: %s]"},
-    {ERR_CONN_SSL_CN, ERR_CATEGORY_CONN, 0, "SSL certificate host name verification failure." },
+    {ERR_CONN_SSL_CN, ERR_CATEGORY_CONN, 0, "SSL certificate host name verification failure. [Details: %s]" },
     {ERR_CONN_SSL_CERTVERIFY, ERR_CATEGORY_CONN, 0, "SSL certificate verification failed. [Details: %s]"},
     {ERR_QRY_OUTOFMEM, ERR_CATEGORY_QRY, 0, "Out of memory."},
     {ERR_QRY_COMMERR, ERR_CATEGORY_QRY, 0, "Communication error. %s"},
diff --git a/contrib/native/client/src/clientlib/userProperties.cpp b/contrib/native/client/src/clientlib/userProperties.cpp
index f1aa82f..0ad8af1 100644
--- a/contrib/native/client/src/clientlib/userProperties.cpp
+++ b/contrib/native/client/src/clientlib/userProperties.cpp
@@ -35,6 +35,7 @@ const std::map<std::string, uint32_t>  DrillUserProperties::USER_PROPERTIES=boos
     ( USERPROP_DISABLE_HOSTVERIFICATION,    USERPROP_FLAGS_BOOLEAN|USERPROP_FLAGS_SSLPROP)
     ( USERPROP_DISABLE_CERTVERIFICATION,    USERPROP_FLAGS_BOOLEAN|USERPROP_FLAGS_SSLPROP)
     ( USERPROP_USESYSTEMTRUSTSTORE,    USERPROP_FLAGS_BOOLEAN|USERPROP_FLAGS_SSLPROP)
+    ( USERPROP_CUSTOM_SSLCTXOPTIONS,   USERPROP_FLAGS_STRING|USERPROP_FLAGS_SSLPROP)
     ( USERPROP_SASL_ENCRYPT,  USERPROP_FLAGS_STRING)
 ;
 
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index 18cfc69..b5bb522 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -173,7 +173,8 @@ typedef enum{
 #define USERPROP_PASSWORD "password"
 #define USERPROP_SCHEMA   "schema"
 #define USERPROP_USESSL   "enableTLS"
-#define USERPROP_TLSPROTOCOL "TLSProtocol" //TLS version
+#define USERPROP_TLSPROTOCOL "TLSProtocol" //TLS version. The exact TLS version.
+#define USERPROP_CUSTOM_SSLCTXOPTIONS "CustomSSLCtxOptions" // The custom SSL CTX options.
 #define USERPROP_CERTFILEPATH "certFilePath" // pem file path and name
 // TODO: support truststore protected by password. 
 // #define USERPROP_CERTPASSWORD "certPassword" // Password for certificate file. 


[drill] 10/13: [DRILL-6586] Add SSL Hostname verification with zookeeper connection mode support

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit b1eb9d76cda84661e5ebd6f1d87a5d5ee0501526
Author: superbstreak <ro...@gmail.com>
AuthorDate: Mon Jul 9 01:33:55 2018 -0700

    [DRILL-6586]  Add SSL Hostname verification with zookeeper connection mode support
---
 contrib/native/client/src/clientlib/channel.cpp |  7 +-----
 contrib/native/client/src/clientlib/channel.hpp | 33 +++++++++++++++----------
 2 files changed, 21 insertions(+), 19 deletions(-)

diff --git a/contrib/native/client/src/clientlib/channel.cpp b/contrib/native/client/src/clientlib/channel.cpp
index fc97816..e368cd0 100644
--- a/contrib/native/client/src/clientlib/channel.cpp
+++ b/contrib/native/client/src/clientlib/channel.cpp
@@ -376,13 +376,8 @@ connectionStatus_t SSLStreamChannel::init(){
     std::string disableHostVerification;
     props->getProp(USERPROP_DISABLE_HOSTVERIFICATION, disableHostVerification);
     if (disableHostVerification != "true") {
-        // Populate endpoint information before we retrieve host name.
-        m_pEndpoint->parseConnectString();
-        std::string hostStr  = m_pEndpoint->getHost();
         ((SSLChannelContext_t *) m_pContext)->getSslContext().set_verify_callback(
-                DrillSSLHostnameVerifier(
-                    ((SSLChannelContext_t *)m_pContext), 
-                    boost::asio::ssl::rfc2818_verification(hostStr.c_str())));
+                DrillSSLHostnameVerifier(this));
     }
 
     m_pSocket=new SslSocket(m_ioService, ((SSLChannelContext_t*)m_pContext)->getSslContext() );
diff --git a/contrib/native/client/src/clientlib/channel.hpp b/contrib/native/client/src/clientlib/channel.hpp
index e739118..76bedde 100644
--- a/contrib/native/client/src/clientlib/channel.hpp
+++ b/contrib/native/client/src/clientlib/channel.hpp
@@ -41,13 +41,14 @@ class UserProperties;
 
             //parse the connection string and set up the host and port to connect to
             connectionStatus_t getDrillbitEndpoint();
-            void parseConnectString();
+            
             const std::string& getProtocol() const {return m_protocol;}
             const std::string& getHost() const {return m_host;}
             const std::string& getPort() const {return m_port;}
             DrillClientError* getError(){ return m_pError;};
 
         private:
+            void parseConnectString();
             bool isDirectConnection();
             bool isZookeeperConnection();
             connectionStatus_t getDrillbitEndpointFromZk();
@@ -171,6 +172,8 @@ class UserProperties;
 
             ConnectionEndpoint* getEndpoint(){return m_pEndpoint;}
 
+            ChannelContext_t* getChannelContext(){ return m_pContext; }
+
         protected:
             connectionStatus_t handleError(connectionStatus_t status, std::string msg);
 
@@ -276,11 +279,8 @@ class UserProperties;
         public:
             /// @brief The constructor.
             /// 
-            /// @param in_pctx                  The SSL Channel Context.
-            /// @param in_verifier              The wrapped verifier.
-            DrillSSLHostnameVerifier(SSLChannelContext_t* in_pctx, boost::asio::ssl::rfc2818_verification in_verifier) : 
-                m_verifier(in_verifier),
-                m_pctx(in_pctx){
+            /// @param in_channel                  The Channel.
+            DrillSSLHostnameVerifier(Channel* in_channel) : m_channel(in_channel){
                 DRILL_LOG(LOG_INFO)
                     << "DrillSSLHostnameVerifier::DrillSSLHostnameVerifier: +++++ Enter +++++" 
                     << std::endl;
@@ -295,23 +295,30 @@ class UserProperties;
                 boost::asio::ssl::verify_context& in_ctx){
                 DRILL_LOG(LOG_INFO) << "DrillSSLHostnameVerifier::operator(): +++++ Enter +++++" << std::endl;
 
-                bool verified = m_verifier(in_preverified, in_ctx);
+                // Gets the channel context.
+                SSLChannelContext_t* context = (SSLChannelContext_t*)(m_channel->getChannelContext());
+
+                // Retrieve the host before we perform Host name verification.
+                // This is because host with ZK mode is selected after the connect() function is called.
+                boost::asio::ssl::rfc2818_verification verifier(m_channel->getEndpoint()->getHost().c_str());
+
+                // Perform verification.
+                bool verified = verifier(in_preverified, in_ctx);
 
                 DRILL_LOG(LOG_DEBUG) 
                     << "DrillSSLHostnameVerifier::operator(): Verification Result: " 
                     << verified 
                     << std::endl;
 
-                m_pctx->SetCertHostnameVerificationStatus(verified);
-                return verified;
+                // Sets the result back to the context.
+                context->SetCertHostnameVerificationStatus(verified);
+                return verified && in_preverified;
             }
 
         private:
-            // The inner verifier.
-            boost::asio::ssl::rfc2818_verification m_verifier;
 
-            // The SSL channel context.
-            SSLChannelContext_t* m_pctx;
+            // The SSL channel.
+            Channel* m_channel;
     };
 
 } // namespace Drill


[drill] 12/13: DRILL-6542 : IndexOutOfBoundsException for multilevel lateral queries with schema changed partitioned complex data

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 4168e1e84d57b15d7667f7a768a0a47a577d0e79
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Mon Jul 9 17:58:08 2018 -0700

    DRILL-6542 : IndexOutOfBoundsException for multilevel lateral queries with schema changed partitioned complex data
    
    closes #1374
---
 .../drill/exec/physical/impl/unnest/Unnest.java    |  2 +
 .../exec/physical/impl/unnest/UnnestImpl.java      | 20 +++++++++
 .../physical/impl/unnest/UnnestRecordBatch.java    | 50 ++++++++++++++-------
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  | 51 ++++++++++++++++++++++
 4 files changed, 107 insertions(+), 16 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
index 77a2ffa..1a042b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
@@ -64,4 +64,6 @@ public interface Unnest {
    * time a new batch comes in.
    */
   void resetGroupIndex();
+
+  void close();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index ffc64f9..1d3b8f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.LateralContract;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.slf4j.Logger;
@@ -51,6 +52,7 @@ public class UnnestImpl implements Unnest {
   private SelectionVectorMode svMode;
   private RepeatedValueVector fieldToUnnest;
   private RepeatedValueVector.RepeatedAccessor accessor;
+  private RecordBatch outgoing;
 
   /**
    * The output batch limit starts at OUTPUT_ROW_COUNT, but may be decreased
@@ -97,8 +99,16 @@ public class UnnestImpl implements Unnest {
 
     logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record count: {}, output limit: {}", innerValueCount,
         recordCount, outputLimit);
+    final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
     for (TransferPair t : transfers) {
       t.splitAndTransfer(innerValueIndex, count);
+
+      // Get the corresponding ValueVector in output container and transfer the data
+      final ValueVector vectorWithData = t.getTo();
+      final ValueVector outputVector = outgoing.getContainer().addOrGet(vectorWithData.getField(), callBack);
+      Preconditions.checkState(!callBack.getSchemaChangedAndReset(), "Outgoing container doesn't have " +
+        "expected ValueVector of type %s, present in TransferPair of unnest field", vectorWithData.getClass());
+      vectorWithData.makeTransferPair(outputVector).transfer();
     }
     innerValueIndex += count;
     return count;
@@ -110,6 +120,7 @@ public class UnnestImpl implements Unnest {
       List<TransferPair> transfers, LateralContract lateral) throws SchemaChangeException {
 
     this.svMode = incoming.getSchema().getSelectionVectorMode();
+    this.outgoing = outgoing;
     if (svMode == NONE) {
       this.transfers = ImmutableList.copyOf(transfers);
       this.lateral = lateral;
@@ -123,4 +134,13 @@ public class UnnestImpl implements Unnest {
     this.innerValueIndex = 0;
   }
 
+  @Override
+  public void close() {
+    if (transfers != null) {
+      for (TransferPair tp : transfers) {
+        tp.getTo().close();
+      }
+      transfers = null;
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 9c1e702..d985423 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -48,7 +49,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA
 public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPOP> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
 
-  private Unnest unnest;
+  private Unnest unnest = new UnnestImpl();
   private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was
                                         // sent. The next iteration, we need to make sure the record batch sizer
                                         // is updated before we process the actual data.
@@ -234,8 +235,23 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
             return IterOutcome.STOP;
           }
           return OK_NEW_SCHEMA;
-        }
-        // else
+        } else { // Unnest field schema didn't changed but new left empty/nonempty batch might come with OK_NEW_SCHEMA
+          try {
+            // This means even though there is no schema change for unnest field the reference of unnest field
+            // ValueVector must have changed hence we should just refresh the transfer pairs and keep output vector
+            // same as before. In case when new left batch is received with SchemaChange but was empty Lateral will
+            // not call next on unnest and will change it's left outcome to OK. Whereas for non-empty batch next will
+            // be called on unnest by Lateral. Hence UNNEST cannot rely on lateral current outcome to setup transfer
+            // pair. It should do for each new left incoming batch.
+            resetUnnestTransferPair();
+            container.zeroVectors();
+          } catch (SchemaChangeException ex) {
+            kill(false);
+            logger.error("Failure during query", ex);
+            context.getExecutorState().fail(ex);
+            return IterOutcome.STOP;
+          }
+        } // else
         unnest.resetGroupIndex();
         memoryManager.update();
       }
@@ -353,26 +369,27 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     return tp;
   }
 
-  @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
-    Preconditions.checkNotNull(lateral);
-    container.clear();
-    recordCount = 0;
+  private TransferPair resetUnnestTransferPair() throws SchemaChangeException {
     final List<TransferPair> transfers = Lists.newArrayList();
-
     final FieldReference fieldReference = new FieldReference(popConfig.getColumn());
-
     final TransferPair transferPair = getUnnestFieldTransferPair(fieldReference);
-
-    final ValueVector unnestVector = transferPair.getTo();
     transfers.add(transferPair);
-    container.add(unnestVector);
     logger.debug("Added transfer for unnest expression.");
-    container.buildSchema(SelectionVectorMode.NONE);
-
-    this.unnest = new UnnestImpl();
+    unnest.close();
     unnest.setup(context, incoming, this, transfers, lateral);
     setUnnestVector();
+    return transferPair;
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+    Preconditions.checkNotNull(lateral);
+    container.clear();
+    recordCount = 0;
+    unnest = new UnnestImpl();
+    final TransferPair tp = resetUnnestTransferPair();
+    container.add(TypeHelper.getNewVector(tp.getTo().getField(), oContext.getAllocator()));
+    container.buildSchema(SelectionVectorMode.NONE);
     return true;
   }
 
@@ -428,6 +445,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
   @Override
   public void close() {
     updateStats();
+    unnest.close();
     super.close();
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 17a9d33..394e732 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -370,6 +370,37 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
     }
   }
 
+  /**
+   * This test is different than {@link TestE2EUnnestAndLateral#testSchemaChangeOnNonUnnestColumn()} because with
+   * multilevel when the first Lateral see's a schema change it creates a new batch with new vector references. Hence
+   * the second lateral will receive a new incoming with new vector references with OK_NEW_SCHEMA outcome. Now even
+   * though there is schema change for non-unnest column the second Unnest has to again setup it's transfer pairs since
+   * vector reference for unnest field has changed for second Unnest.
+   * Whereas in other test since there is only 1 Lateral followed by Scan, the incoming for lateral which has
+   * schema change will be handled by Scan in such a way that it only updates vector of affected column. Hence in this
+   * case vector corresponding to unnest field will not be affected and it will work fine.
+   * @throws Exception
+   */
+  @Test
+  public void testSchemaChangeOnNonUnnestColumn_InMultilevelCase() throws Exception {
+
+    try {
+      dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_1));
+      String sql = "SELECT customer.c_custkey, customer.c_name, customer.c_nationkey, orders.orderkey, " +
+        "orders.totalprice, olineitems.l_partkey, olineitems.l_linenumber, olineitems.l_quantity " +
+        "FROM dfs.`lateraljoin/multipleFiles` customer, " +
+        "LATERAL (SELECT t1.o.o_orderkey as orderkey, t1.o.o_totalprice as totalprice, t1.o.o_lineitems as lineitems " +
+        "FROM UNNEST(customer.c_orders) t1(o)) orders, " +
+        "LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " +
+        "FROM UNNEST(orders.lineitems) t2(l)) olineitems";
+      test(sql);
+    } catch (Exception ex) {
+      fail();
+    } finally {
+      dirTestWatcher.removeFileFromRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_1));
+    }
+  }
+
   @Test
   public void testSchemaChangeOnUnnestColumn() throws Exception {
     try {
@@ -387,6 +418,26 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
   }
 
   @Test
+  public void testSchemaChangeOnUnnestColumn_InMultilevelCase() throws Exception {
+    try {
+      dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_2));
+
+      String sql = "SELECT customer.c_custkey, customer.c_name, customer.c_nationkey, orders.orderkey, " +
+        "orders.totalprice, orders.spriority, olineitems.l_partkey, olineitems.l_linenumber, olineitems.l_quantity " +
+        "FROM dfs.`lateraljoin/multipleFiles` customer, " +
+        "LATERAL (SELECT t1.o.o_orderkey as orderkey, t1.o.o_totalprice as totalprice, t1.o.o_lineitems as lineitems," +
+        " t1.o.o_shippriority as spriority FROM UNNEST(customer.c_orders) t1(o)) orders, " +
+        "LATERAL (SELECT t2.l.l_partkey as l_partkey, t2.l.l_linenumber as l_linenumber, t2.l.l_quantity as l_quantity " +
+        "FROM UNNEST(orders.lineitems) t2(l)) olineitems";
+      test(sql);
+    } catch (Exception ex) {
+      fail();
+    } finally {
+      dirTestWatcher.removeFileFromRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_2));
+    }
+  }
+
+  @Test
   public void testSchemaChangeOnMultipleColumns() throws Exception {
     try {
       dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", "multipleFiles", schemaChangeFile_3));


[drill] 04/13: DRILL-6559: Travis timing out

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit d4f3304178f09f9fd7593ea70ffbff949c289e82
Author: Vitalii Diravka <vi...@gmail.com>
AuthorDate: Mon Jul 2 00:47:13 2018 +0300

    DRILL-6559: Travis timing out
    
    * Excluding contrib module tests and all exec TPCH unit tests for travis full build.
    * Introducing new TRAVIS profile
    * Travis build is faster for 4-5 mins
    
    closes #1364
---
 .travis.yml                                                      | 2 +-
 contrib/pom.xml                                                  | 9 +++++++++
 .../java/org/apache/drill/exec/store/mongo/MongoTestSuit.java    | 4 ++++
 .../apache/drill/exec/store/mongo/TestMongoChunkAssignment.java  | 3 ++-
 .../java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java | 4 ++--
 .../src/test/java/org/apache/drill/TestTpchSingleMode.java       | 3 +++
 6 files changed, 21 insertions(+), 4 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 3dce22c..afbfb09 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -22,4 +22,4 @@ cache:
   directories:
   - "$HOME/.m2"
 install: MAVEN_OPTS="-Xms1G -Xmx1G" mvn install --batch-mode -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DskipTests=true -Dmaven.javadoc.skip=true -Dmaven.source.skip=true
-script: mvn install -Drat.skip=false -Dlicense.skip=false -DexcludedGroups="org.apache.drill.categories.SlowTest,org.apache.drill.categories.UnlikelyTest,org.apache.drill.categories.SecurityTest" -DforkCount=1 -DmemoryMb=2560 -DdirectMemoryMb=4608
+script: mvn install -Drat.skip=false -Dlicense.skip=false -DexcludedGroups="org.apache.drill.categories.SlowTest,org.apache.drill.categories.UnlikelyTest,org.apache.drill.categories.SecurityTest" -DforkCount=1 -DmemoryMb=2560 -DdirectMemoryMb=4608 -Ptravis
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 6ae2330..5d45d9d 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -47,4 +47,13 @@
     <module>data</module>
     <module>gis</module>
   </modules>
+
+  <profiles>
+    <profile>
+      <id>travis</id>
+      <properties>
+        <skipTests>true</skipTests>
+      </properties>
+    </profile>
+  </profiles>
 </project>
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
index 4e5ca37..33431a6 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
@@ -26,10 +26,13 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
+import org.apache.drill.categories.MongoStorageTest;
+import org.apache.drill.categories.SlowTest;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 import org.junit.runners.Suite.SuiteClasses;
@@ -63,6 +66,7 @@ import de.flapdoodle.embed.process.runtime.Network;
 @RunWith(Suite.class)
 @SuiteClasses({ TestMongoFilterPushDown.class, TestMongoProjectPushDown.class,
     TestMongoQueries.class, TestMongoChunkAssignment.class })
+@Category({SlowTest.class, MongoStorageTest.class})
 public class MongoTestSuit implements MongoTestConstants {
 
   private static final Logger logger = LoggerFactory
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
index 638cacb..efe450f 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoChunkAssignment.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.drill.categories.MongoStorageTest;
+import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.mongo.common.ChunkInfo;
@@ -39,7 +40,7 @@ import com.google.common.collect.Sets;
 import com.mongodb.ServerAddress;
 import org.junit.experimental.categories.Category;
 
-@Category(MongoStorageTest.class)
+@Category({SlowTest.class, MongoStorageTest.class})
 public class TestMongoChunkAssignment {
   static final String HOST_A = "A";
   static final String HOST_B = "B";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
index 51aea18..e84bb27 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
@@ -17,13 +17,13 @@
  */
 package org.apache.drill;
 
-import org.apache.drill.categories.PlannerTest;
+import org.apache.drill.categories.SlowTest;
 import org.apache.drill.test.BaseTestQuery;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(PlannerTest.class)
+@Category({SlowTest.class})
 public class TestTpchLimit0 extends BaseTestQuery {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchLimit0.class);
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
index a3e34c0..dc1848b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
@@ -17,10 +17,13 @@
  */
 package org.apache.drill;
 
+import org.apache.drill.categories.SlowTest;
 import org.apache.drill.test.BaseTestQuery;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+@Category({SlowTest.class})
 public class TestTpchSingleMode extends BaseTestQuery {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchSingleMode.class);