You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2017/03/20 15:56:54 UTC

[1/7] drill git commit: DRILL-5226: Managed external sort fixes

Repository: drill
Updated Branches:
  refs/heads/master 11caa85c3 -> adbf363d2


DRILL-5226: Managed external sort fixes

* Memory leak in managed sort if OOM during sv2 allocation
* "Record batch sizer" does not include overhead for variable-sized
vectors
* Paranoid double-checking of merge batch sizes to prevent OOM when the
sizes differ from expectations
* Revised logging

Addresses review comments

close apache/drill#767


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8656c83b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8656c83b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8656c83b

Branch: refs/heads/master
Commit: 8656c83b00f8ab09fb6817e4e9943b2211772541
Parents: 11caa85
Author: Paul Rogers <pr...@maprtech.com>
Authored: Thu Mar 2 16:09:01 2017 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Mon Mar 20 07:56:31 2017 -0700

----------------------------------------------------------------------
 .../physical/impl/spill/RecordBatchSizer.java   | 32 ++++-----
 .../physical/impl/xsort/managed/BatchGroup.java |  9 ++-
 .../impl/xsort/managed/CopierHolder.java        | 17 ++++-
 .../impl/xsort/managed/ExternalSortBatch.java   | 71 ++++++++++++++++----
 .../templates/VariableLengthVectors.java        |  7 +-
 5 files changed, 104 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8656c83b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
index 22b1b0e..b384e0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -107,21 +107,23 @@ public class RecordBatchSizer {
 
     @Override
     public String toString() {
-      StringBuilder buf = new StringBuilder();
-      buf.append(metadata.getName());
-      buf.append("(std col. size: ");
-      buf.append(stdSize);
-      buf.append(", actual col. size: ");
-      buf.append(estSize);
-      buf.append(", total size: ");
-      buf.append(totalSize);
-      buf.append(", data size: ");
-      buf.append(dataSize);
-      buf.append(", row capacity: ");
-      buf.append(capacity);
-      buf.append(", density: ");
-      buf.append(density);
-      buf.append(")");
+      StringBuilder buf = new StringBuilder()
+          .append(metadata.getName())
+          .append("(type: ")
+          .append(metadata.getType().getMinorType().name())
+          .append(", std col. size: ")
+          .append(stdSize)
+          .append(", actual col. size: ")
+          .append(estSize)
+          .append(", total size: ")
+          .append(totalSize)
+          .append(", data size: ")
+          .append(dataSize)
+          .append(", row capacity: ")
+          .append(capacity)
+          .append(", density: ")
+          .append(density)
+          .append(")");
       return buf.toString();
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/8656c83b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
index 7ea599c..2e5d5b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
@@ -150,7 +150,8 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
     private String path;
     private SpillSet spillSet;
     private BufferAllocator allocator;
-    private int spilledBatches = 0;
+    private int spilledBatches;
+    private long batchSize;
 
     public SpilledRun(SpillSet spillSet, String path, OperatorContext context) throws IOException {
       super(null, context);
@@ -178,6 +179,12 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
       currentContainer.setRecordCount(0);
     }
 
+    public void setBatchSize(long batchSize) {
+      this.batchSize = batchSize;
+    }
+
+    public long getBatchSize() { return batchSize; }
+
     @Override
     public int getNextIndex() {
       if (pointer == getRecordCount()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/8656c83b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
index 4fa520d..c6b2dd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
@@ -176,6 +176,7 @@ public class CopierHolder {
     private int targetRecordCount;
     private int copyCount;
     private int batchCount;
+    private long estBatchSize;
 
     /**
      * Creates a merger with an temporary output container.
@@ -229,15 +230,18 @@ public class CopierHolder {
     @Override
     public boolean next() {
       Stopwatch w = Stopwatch.createStarted();
+      long start = holder.allocator.getAllocatedMemory();
       int count = holder.copier.next(targetRecordCount);
       copyCount += count;
       if (count > 0) {
         long t = w.elapsed(TimeUnit.MICROSECONDS);
+        batchCount++;
         logger.trace("Took {} us to merge {} records", t, count);
+        long size = holder.allocator.getAllocatedMemory() - start;
+        estBatchSize = Math.max(estBatchSize, size);
       } else {
         logger.trace("copier returned 0 records");
       }
-      batchCount++;
 
       // Identify the schema to be used in the output container. (Since
       // all merged batches have the same schema, the schema we identify
@@ -303,5 +307,16 @@ public class CopierHolder {
     public int getBatchCount() {
       return batchCount;
     }
+
+    /**
+     * Gets the estimated batch size, in bytes. Use for estimating the memory
+     * needed to process the batches that this operator created.
+     * @return the size of the largest batch created by this operation,
+     * in bytes
+     */
+
+    public long getEstBatchSize() {
+      return estBatchSize;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8656c83b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index a1162a0..69e9b4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
 import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
 import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
+import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -182,13 +183,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
 
   /**
-   * The preferred amount of memory to set aside to output batches
-   * expressed as a ratio of available memory.
-   */
-
-  private static final float MERGE_BATCH_ALLOWANCE = 0.10F;
-
-  /**
    * In the bizarre case where the user gave us an unrealistically low
    * spill file size, set a floor at some bare minimum size. (Note that,
    * at this size, big queries will create a huge number of files, which
@@ -777,6 +771,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    * @return the converted batch, or null if the incoming batch is empty
    */
 
+  @SuppressWarnings("resource")
   private VectorContainer convertBatch() {
 
     // Must accept the batch even if no records. Then clear
@@ -788,6 +783,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       for (VectorWrapper<?> w : convertedBatch) {
         w.clear();
       }
+      SelectionVector2 sv2 = incoming.getSelectionVector2();
+      if (sv2 != null) {
+        sv2.clear();
+      }
       return null;
     }
     return convertedBatch;
@@ -852,7 +851,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       return;
     }
 
-    SelectionVector2 sv2 = makeSelectionVector();
+    SelectionVector2 sv2;
+    try {
+      sv2 = makeSelectionVector();
+    } catch (Exception e) {
+      convertedBatch.clear();
+      throw e;
+    }
 
     // Compute batch size, including allocation of an sv2.
 
@@ -994,7 +999,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
     // Maintain an estimate of the incoming batch size: the largest
     // batch yet seen. Used to reserve memory for the next incoming
-    // batch.
+    // batch. Because we are using the actual observed batch size,
+    // the size already includes overhead due to power-of-two rounding.
 
     long origInputBatchSize = estimatedInputBatchSize;
     estimatedInputBatchSize = Math.max(estimatedInputBatchSize, actualBatchSize);
@@ -1013,7 +1019,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       return; }
 
     // Estimate the total size of each incoming batch plus sv2. Note that, due
-    // to power-of-two rounding, the allocated size might be twice the data size.
+    // to power-of-two rounding, the allocated sv2 size might be twice the data size.
 
     long estimatedInputSize = estimatedInputBatchSize + 4 * actualRecordCount;
 
@@ -1290,7 +1296,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
     logger.trace("Merging {} on-disk runs, Alloc. memory = {}",
         mergeCount, allocator.getAllocatedMemory());
-    mergeAndSpill(spilledRuns, mergeCount);
+    mergeRuns(mergeCount);
     return true;
   }
 
@@ -1333,8 +1339,43 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     mergeAndSpill(bufferedBatches, spillCount);
   }
 
+  private void mergeRuns(int targetCount) {
+
+    // Determine the number of runs to merge. The count should be the
+    // target count. However, to prevent possible memory overrun, we
+    // double-check with actual spill batch size and only spill as much
+    // as fits in the merge memory pool.
+
+    int mergeCount = 0;
+    long mergeSize = 0;
+    for (SpilledRun run : spilledRuns) {
+      long batchSize = run.getBatchSize();
+      if (mergeSize + batchSize > mergeMemoryPool) {
+        break;
+      }
+      mergeSize += batchSize;
+      mergeCount++;
+      if (mergeCount == targetCount) {
+        break;
+      }
+    }
+
+    // Must always spill at least 2, even if this creates an over-size
+    // spill file. But, if this is a final consolidation, we may have only
+    // a single batch.
+
+    mergeCount = Math.max(mergeCount, 2);
+    mergeCount = Math.min(mergeCount, spilledRuns.size());
+
+    // Do the actual spill.
+
+    mergeAndSpill(spilledRuns, mergeCount);
+  }
+
   private void mergeAndSpill(LinkedList<? extends BatchGroup> source, int count) {
     spilledRuns.add(doMergeAndSpill(source, count));
+    logger.trace("Completed spill: memory = {}",
+                 allocator.getAllocatedMemory());
   }
 
   private BatchGroup.SpilledRun doMergeAndSpill(LinkedList<? extends BatchGroup> batchGroups, int spillCount) {
@@ -1354,7 +1395,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     BatchGroup.SpilledRun newGroup = null;
     try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);
          CopierHolder.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill, spillBatchRowCount)) {
-      logger.trace("Spilling {} of {} batches, {} rows, memory = {}, write to {}",
+      logger.trace("Spilling {} of {} batches, spill batch size = {} rows, memory = {}, write to {}",
                    batchesToSpill.size(), bufferedBatches.size() + batchesToSpill.size(),
                    spillBatchRowCount,
                    allocator.getAllocatedMemory(), outputFile);
@@ -1375,8 +1416,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       }
       injector.injectChecked(context.getExecutionControls(), INTERRUPTION_WHILE_SPILLING, IOException.class);
       newGroup.closeOutputStream();
-      logger.trace("mergeAndSpill: completed, memory = {}, spilled {} records to {}",
-                   allocator.getAllocatedMemory(), merger.getRecordCount(), outputFile);
+      logger.trace("Spilled {} batches, {} records; memory = {} to {}",
+                   merger.getBatchCount(), merger.getRecordCount(),
+                   allocator.getAllocatedMemory(), outputFile);
+      newGroup.setBatchSize(merger.getEstBatchSize());
       return newGroup;
     } catch (Throwable e) {
       // we only need to clean up newGroup if spill failed

http://git-wip-us.apache.org/repos/asf/drill/blob/8656c83b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index ea3c9de..bb1d4fb 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -253,7 +253,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       // If 1 or more values, then the last value is set to
       // the offset of the next value, which is the same as
       // the length of existing values.
-      return a.get(count-1);
+      // In addition to the actual data bytes, we must also
+      // include the "overhead" bytes: the offset vector entries
+      // that accompany each column value. Thus, total payload
+      // size is consumed text bytes + consumed offset vector
+      // bytes.
+      return a.get(count-1) + offsetVector.getPayloadByteCount();
     }
   }
 


[3/7] drill git commit: DRILL-5316: Check drillbits size before we attempt to access the vector element

Posted by am...@apache.org.
DRILL-5316: Check drillbits size before we attempt to access the vector element

close apache/drill#772


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/90bc8005
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/90bc8005
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/90bc8005

Branch: refs/heads/master
Commit: 90bc80052f66874cf9eaf8b8bc93f28f703b5506
Parents: 97e2a1d
Author: Rob Wu <ro...@gmail.com>
Authored: Mon Mar 6 18:17:25 2017 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Mon Mar 20 08:08:13 2017 -0700

----------------------------------------------------------------------
 contrib/native/client/src/clientlib/drillClientImpl.cpp | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/90bc8005/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index a41cb4b..9d45f57 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -86,6 +86,9 @@ connectionStatus_t DrillClientImpl::connect(const char* connStr, DrillUserProper
         std::vector<std::string> drillbits;
         int err = zook.getAllDrillbits(hostPortStr, drillbits);
         if(!err){
+            if (drillbits.empty()){
+                return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_ZKNODBIT));
+            }
             Utils::shuffle(drillbits);
             exec::DrillbitEndpoint endpoint;
             err = zook.getEndPoint(drillbits[drillbits.size() -1], endpoint);// get the last one in the list
@@ -2142,6 +2145,9 @@ connectionStatus_t PooledDrillClientImpl::connect(const char* connStr, DrillUser
         std::vector<std::string> drillbits;
         int err = zook.getAllDrillbits(hostPortStr, drillbits);
         if(!err){
+            if (drillbits.empty()){
+                return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_ZKNODBIT));
+            }
             Utils::shuffle(drillbits);
             // The original shuffled order is maintained if we shuffle first and then add any missing elements
             Utils::add(m_drillbits, drillbits);


[4/7] drill git commit: DRILL-5330: NPE in FunctionImplementationRegistry

Posted by am...@apache.org.
DRILL-5330: NPE in FunctionImplementationRegistry

Fixes:

* DRILL-5330: NPE in
FunctionImplementationRegistry.functionReplacement()
* DRILL-5331:
NPE in FunctionImplementationRegistry.findDrillFunction() if dynamic
UDFs disabled

When running in a unit test, the dynamic UDF (DUDF) mechanism is not
available. When running in production, the DUDF mechanism is available,
but may be disabled.

One confusing aspect of this code is that the function registry
is given the option manager, but the option manager is not yet valid
(not yet initialized) in the function registry constructor. So, we
cannot access the option manager in the function registry constructor.

In any event, the existing system options cannot be used to disable DUDF
support. For obscure reasons, DUDF support is always enabled, even when
disabled by the user.

Instead, for DRILL-5331, we added a config option to "really" disable DUDFS.
The property is set only for tests, disables DUDF support.
Note that, in the future, this option could be generalized to
"off, read-only, on" to capture the full set of DUDF modes.
But, for now, just turning this off is sufficient.

For DRILL-5330, we use an existing option validator rather than
accessing the raw option directly.

Also includes a bit of code cleanup in the class in question.

The result is that the code now works when used in a sub-operator unit
test.

close apache/drill#777


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2f13c08f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2f13c08f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2f13c08f

Branch: refs/heads/master
Commit: 2f13c08f35152639e4619d2898b2ca8fe7115259
Parents: 90bc800
Author: Paul Rogers <pr...@maprtech.com>
Authored: Fri Mar 10 11:55:13 2017 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Mon Mar 20 08:09:57 2017 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  3 +-
 .../coord/store/TransientStoreListener.java     |  2 +-
 .../expr/fn/FunctionImplementationRegistry.java | 82 +++++++++++++-------
 .../src/main/resources/drill-module.conf        |  3 +
 .../java/org/apache/drill/exec/ExecTest.java    |  2 +-
 .../exec/physical/impl/TestSimpleFunctions.java |  4 +-
 6 files changed, 60 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/2f13c08f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index da3a312..91498fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -144,6 +144,7 @@ public interface ExecConstants {
   String UDF_DIRECTORY_STAGING = "drill.exec.udf.directory.staging";
   String UDF_DIRECTORY_REGISTRY = "drill.exec.udf.directory.registry";
   String UDF_DIRECTORY_TMP = "drill.exec.udf.directory.tmp";
+  String UDF_DISABLE_DYNAMIC = "drill.exec.udf.disable_dynamic";
 
   /**
    * Local temporary directory is used as base for temporary storage of Dynamic UDF jars.
@@ -264,7 +265,7 @@ public interface ExecConstants {
       SLICE_TARGET_DEFAULT);
 
   String CAST_TO_NULLABLE_NUMERIC = "drill.exec.functions.cast_empty_string_to_null";
-  OptionValidator CAST_TO_NULLABLE_NUMERIC_OPTION = new BooleanValidator(CAST_TO_NULLABLE_NUMERIC, false);
+  BooleanValidator CAST_TO_NULLABLE_NUMERIC_OPTION = new BooleanValidator(CAST_TO_NULLABLE_NUMERIC, false);
 
   /**
    * HashTable runtime settings

http://git-wip-us.apache.org/repos/asf/drill/blob/2f13c08f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java
index ca8fa9d..3cd86f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java
@@ -27,6 +27,6 @@ public interface TransientStoreListener {
    *
    * @param event  event details
    */
-  void onChange(TransientStoreEvent event);
+  void onChange(TransientStoreEvent<?> event);
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2f13c08f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index 5c7bfb4..c1ba2d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -83,17 +83,30 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
   private boolean deleteTmpDir = false;
   private File tmpDir;
   private List<PluggableFunctionRegistry> pluggableFuncRegistries = Lists.newArrayList();
-  private OptionManager optionManager = null;
+  private final OptionManager optionManager;
+  private final boolean useDynamicUdfs;
 
-  @Deprecated @VisibleForTesting
-  public FunctionImplementationRegistry(DrillConfig config){
-    this(config, ClassPathScanner.fromPrescan(config));
+  @VisibleForTesting
+  public FunctionImplementationRegistry(DrillConfig config) {
+    this(config, ClassPathScanner.fromPrescan(config), null);
   }
 
-  public FunctionImplementationRegistry(DrillConfig config, ScanResult classpathScan){
+  public FunctionImplementationRegistry(DrillConfig config, ScanResult classpathScan) {
+    this(config, classpathScan, null);
+  }
+
+  public FunctionImplementationRegistry(DrillConfig config, ScanResult classpathScan, OptionManager optionManager) {
     Stopwatch w = Stopwatch.createStarted();
 
     logger.debug("Generating function registry.");
+    this.optionManager = optionManager;
+
+    // Unit tests fail if dynamic UDFs are turned on AND the test happens
+    // to access an undefined function. Since we want a reasonable failure
+    // rather than a crash, we provide a boot-time option, set only by
+    // tests, to disable DUDF lookup.
+
+    useDynamicUdfs = ! config.getBoolean(ExecConstants.UDF_DISABLE_DYNAMIC);
     localFunctionRegistry = new LocalFunctionRegistry(classpathScan);
 
     Set<Class<? extends PluggableFunctionRegistry>> registryClasses =
@@ -123,11 +136,6 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
     this.localUdfDir = getLocalUdfDir(config);
   }
 
-  public FunctionImplementationRegistry(DrillConfig config, ScanResult classpathScan, OptionManager optionManager) {
-    this(config, classpathScan);
-    this.optionManager = optionManager;
-  }
-
   /**
    * Register functions in given operator table.
    * @param operatorTable operator table
@@ -142,7 +150,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
   }
 
   /**
-   * First attempts to finds the Drill function implementation that matches the name, arg types and return type.
+   * First attempts to find the Drill function implementation that matches the name, arg types and return type.
    * If exact function implementation was not found,
    * syncs local function registry with remote function registry if needed
    * and tries to find function implementation one more time
@@ -156,17 +164,25 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
   public DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall) {
     AtomicLong version = new AtomicLong();
     String newFunctionName = functionReplacement(functionCall);
-    List<DrillFuncHolder> functions = localFunctionRegistry.getMethods(newFunctionName, version);
-    FunctionResolver exactResolver = FunctionResolverFactory.getExactResolver(functionCall);
-    DrillFuncHolder holder = exactResolver.getBestMatch(functions, functionCall);
 
-    if (holder == null) {
+    // Dynamic UDFS: First try with exact match. If not found, we may need to
+    // update the registry, so sync with remote.
+
+    if (useDynamicUdfs) {
+      List<DrillFuncHolder> functions = localFunctionRegistry.getMethods(newFunctionName, version);
+      FunctionResolver exactResolver = FunctionResolverFactory.getExactResolver(functionCall);
+      DrillFuncHolder holder = exactResolver.getBestMatch(functions, functionCall);
+      if (holder != null) {
+        return holder;
+      }
       syncWithRemoteRegistry(version.get());
-      List<DrillFuncHolder> updatedFunctions = localFunctionRegistry.getMethods(newFunctionName, version);
-      holder = functionResolver.getBestMatch(updatedFunctions, functionCall);
     }
 
-    return holder;
+    // Whether Dynamic UDFs or not: look in the registry for
+    // an inexact match.
+
+    List<DrillFuncHolder> functions = localFunctionRegistry.getMethods(newFunctionName, version);
+    return functionResolver.getBestMatch(functions, functionCall);
   }
 
   /**
@@ -177,16 +193,20 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
    */
   private String functionReplacement(FunctionCall functionCall) {
     String funcName = functionCall.getName();
-      if (functionCall.args.size() > 0) {
-          MajorType majorType =  functionCall.args.get(0).getMajorType();
-          DataMode dataMode = majorType.getMode();
-          MinorType minorType = majorType.getMinorType();
-          if (optionManager != null
-              && optionManager.getOption(ExecConstants.CAST_TO_NULLABLE_NUMERIC).bool_val
-              && CastFunctions.isReplacementNeeded(funcName, minorType)) {
-              funcName = CastFunctions.getReplacingCastFunction(funcName, dataMode, minorType);
-          }
-      }
+    if (functionCall.args.size() == 0) {
+      return funcName;
+    }
+    boolean castToNullableNumeric = optionManager != null &&
+                  optionManager.getOption(ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION);
+    if (! castToNullableNumeric) {
+      return funcName;
+    }
+    MajorType majorType =  functionCall.args.get(0).getMajorType();
+    DataMode dataMode = majorType.getMode();
+    MinorType minorType = majorType.getMinorType();
+    if (CastFunctions.isReplacementNeeded(funcName, minorType)) {
+        funcName = CastFunctions.getReplacingCastFunction(funcName, dataMode, minorType);
+    }
 
     return funcName;
   }
@@ -200,7 +220,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
    * @return exactly matching function holder
    */
   public DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType) {
-    return findExactMatchingDrillFunction(name, argTypes, returnType, true);
+    return findExactMatchingDrillFunction(name, argTypes, returnType, useDynamicUdfs);
   }
 
   /**
@@ -315,6 +335,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
    * @param version remote function registry local function registry was based on
    * @return true if remote and local function registries were synchronized after given version
    */
+  @SuppressWarnings("resource")
   public boolean syncWithRemoteRegistry(long version) {
     if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion())) {
       synchronized (this) {
@@ -495,6 +516,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
    * @return local path to jar that was copied
    * @throws IOException in case of problems during jar coping process
    */
+  @SuppressWarnings("resource")
   private Path copyJarToLocal(String jarName, RemoteFunctionRegistry remoteFunctionRegistry) throws IOException {
     Path registryArea = remoteFunctionRegistry.getRegistryArea();
     FileSystem fs = remoteFunctionRegistry.getFs();
@@ -549,7 +571,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
   private class UnregistrationListener implements TransientStoreListener {
 
     @Override
-    public void onChange(TransientStoreEvent event) {
+    public void onChange(TransientStoreEvent<?> event) {
       String jarName = (String) event.getValue();
       localFunctionRegistry.unregister(jarName);
       String localDir = localUdfDir.toUri().getPath();

http://git-wip-us.apache.org/repos/asf/drill/blob/2f13c08f/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 9c2ba2f..3d66d19 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -245,6 +245,9 @@ drill.exec: {
   },
   udf: {
     retry-attempts: 5,
+    // Disables (parts of) the dynamic UDF functionality.
+    // Primarily for testing.
+    disable_dynamic: false,
     directory: {
       // Base directory for remote and local udf directories, unique among clusters.
       base: ${drill.exec.zk.root}"/udf",

http://git-wip-us.apache.org/repos/asf/drill/blob/2f13c08f/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
index 4872909..dead858 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
@@ -52,7 +52,7 @@ public class ExecTest extends DrillTest {
     GuavaPatcher.patch();
   }
 
-  private static final DrillConfig c = DrillConfig.create();
+  protected static final DrillConfig c = DrillConfig.create();
 
   @After
   public void clear(){

http://git-wip-us.apache.org/repos/asf/drill/blob/2f13c08f/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
index ede30e6..6c48651 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
@@ -61,11 +61,10 @@ import com.sun.codemodel.JClassAlreadyExistsException;
 import mockit.Injectable;
 
 public class TestSimpleFunctions extends ExecTest {
-  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleFunctions.class);
-  private final DrillConfig c = DrillConfig.create();
 
   @Test
   public void testHashFunctionResolution() throws JClassAlreadyExistsException, IOException {
+    @SuppressWarnings("resource")
     final FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
     // test required vs nullable Int input
     resolveHash(c,
@@ -133,7 +132,6 @@ public class TestSimpleFunctions extends ExecTest {
                                     FunctionImplementationRegistry registry) throws JClassAlreadyExistsException, IOException {
     final List<LogicalExpression> args = new ArrayList<>();
     args.add(arg);
-    final String[] registeredNames = { "hash" };
     FunctionCall call = new FunctionCall(
         "hash",
         args,


[6/7] drill git commit: DRILL-5352: Profile parser printing for multi fragments

Posted by am...@apache.org.
DRILL-5352: Profile parser printing for multi fragments

Enhances the recently added ProfileParser to display run times for
queries that contain multiple fragments. (The original version handled
just a single fragment.)

Prints the query in \u201cclassic\u201d mode if it is linear, or in the new
semi-indented mode if the query forms a tree.

Also cleans up formatting - removing spaces between parens.

Fixes from review

close apache/drill#782

* Fixed process time percent.
* Added support for getting operator profiles in a multi-fragment query.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a8046bee
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a8046bee
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a8046bee

Branch: refs/heads/master
Commit: a8046bee19a10e648e8633ec637f657b6fc164de
Parents: 1766ffc
Author: Paul Rogers <pr...@maprtech.com>
Authored: Mon Mar 13 20:43:25 2017 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Mon Mar 20 08:13:05 2017 -0700

----------------------------------------------------------------------
 .../org/apache/drill/test/ProfileParser.java    | 478 +++++++++++++------
 1 file changed, 329 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a8046bee/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
index 6c99d8d..1dafef7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
@@ -47,24 +47,49 @@ import com.google.common.base.Preconditions;
 
 public class ProfileParser {
 
+  /**
+   * The original JSON profile.
+   */
+
   JsonObject profile;
+
+  /**
+   * Query text parsed out of the profile.
+   */
+
   String query;
+
+  /**
+   * List of operator plans in the order in which they appear in the query
+   * plan section of the profile. This is an intermediate representation used
+   * to create the more fully analyzed structures.
+   */
+
   List<String> plans;
 
   /**
    * Operations sorted by operator ID. The Operator ID serves as
    * an index into the list to get the information for that operator.
+   * Operator ID is the one shown in the plan: xx-nn, where nn is the
+   * operator ID. This is NOT the same as the operator type.
+   */
+
+  List<OperatorSummary> operations;
+
+  /**
+   * Map from major fragment number to fragment information. The major
+   * fragment number is the nn in the nn-xx notation in the plan.
    */
-  List<OpDefInfo> operations;
+
   Map<Integer,FragInfo> fragments = new HashMap<>();
 
   /**
    * Operations in the original topological order as shown in the text
    * version of the query plan in the query profile.
    */
-  private List<OpDefInfo> topoOrder;
+  private List<OperatorSummary> topoOrder;
 
-  public ProfileParser( File file ) throws IOException {
+  public ProfileParser(File file) throws IOException {
     try (FileReader fileReader = new FileReader(file);
          JsonReader reader = Json.createReader(fileReader)) {
       profile = (JsonObject) reader.read();
@@ -96,28 +121,28 @@ public class ProfileParser {
   private static class PlanParser {
 
     List<String> plans = new ArrayList<>();
-    List<OpDefInfo> operations = new ArrayList<>();
-    List<OpDefInfo> sorted = new ArrayList<>();
+    List<OperatorSummary> operations = new ArrayList<>();
+    List<OperatorSummary> sorted = new ArrayList<>();
 
     public void parsePlans(String plan) {
-      plans = new ArrayList<>( );
+      plans = new ArrayList<>();
       String parts[] = plan.split("\n");
       for (String part : parts) {
         plans.add(part);
-        OpDefInfo opDef = new OpDefInfo( part );
+        OperatorSummary opDef = new OperatorSummary(part);
         operations.add(opDef);
       }
       sortList();
     }
 
     private void sortList() {
-      List<OpDefInfo> raw = new ArrayList<>( );
-      raw.addAll( operations );
-      Collections.sort( raw, new Comparator<OpDefInfo>() {
+      List<OperatorSummary> raw = new ArrayList<>();
+      raw.addAll(operations);
+      Collections.sort(raw, new Comparator<OperatorSummary>() {
         @Override
-        public int compare(OpDefInfo o1, OpDefInfo o2) {
+        public int compare(OperatorSummary o1, OperatorSummary o2) {
           int result = Integer.compare(o1.majorId, o2.majorId);
-          if ( result == 0 ) {
+          if (result == 0) {
             result = Integer.compare(o1.stepId, o2.stepId);
           }
           return result;
@@ -125,38 +150,41 @@ public class ProfileParser {
       });
       int currentFrag = 0;
       int currentStep = 0;
-      for ( OpDefInfo opDef : raw ) {
-        if ( currentFrag < opDef.majorId ) {
+      for (OperatorSummary opDef : raw) {
+        if (currentFrag < opDef.majorId) {
           currentFrag++;
-          OpDefInfo sender = new OpDefInfo( currentFrag, 0 );
+          OperatorSummary sender = new OperatorSummary(currentFrag, 0);
           sender.isInferred = true;
           sender.name = "Sender";
           sorted.add(sender);
           currentStep = 1;
           opDef.inferredParent = sender;
-          sender.children.add( opDef );
+          sender.children.add(opDef);
         }
-        if ( opDef.stepId > currentStep ) {
-          OpDefInfo unknown = new OpDefInfo( currentFrag, currentStep );
+        if (opDef.stepId > currentStep) {
+          OperatorSummary unknown = new OperatorSummary(currentFrag, currentStep);
           unknown.isInferred = true;
           unknown.name = "Unknown";
           sorted.add(unknown);
           opDef.inferredParent = unknown;
-          unknown.children.add( opDef );
+          unknown.children.add(opDef);
         }
-        sorted.add( opDef );
+        sorted.add(opDef);
         currentStep = opDef.stepId + 1;
       }
     }
   }
 
   /**
-   * Parse the plan portion of the query profile.
+   * Parse the plan portion of the query profile. Unfortunately,
+   * the plan is in text form an is awkward to parse. Also, there is no ID
+   * to correlate operators shown in the plan with those referenced in the
+   * profile JSON. Inference is needed.
    */
 
   private void parsePlans() {
     PlanParser parser = new PlanParser();
-    String plan = getPlan( );
+    String plan = getPlan();
     parser.parsePlans(plan);
     plans = parser.plans;
     topoOrder = parser.operations;
@@ -164,7 +192,7 @@ public class ProfileParser {
   }
 
   private void buildFrags() {
-    for (OpDefInfo opDef : operations) {
+    for (OperatorSummary opDef : operations) {
       FragInfo major = fragments.get(opDef.majorId);
       if (major == null) {
         major = new FragInfo(opDef.majorId);
@@ -175,17 +203,17 @@ public class ProfileParser {
   }
 
   private static List<FieldDef> parseCols(String cols) {
-    String parts[] = cols.split( ", " );
-    List<FieldDef> fields = new ArrayList<>( );
-    for ( String part : parts ) {
-      String halves[] = part.split( " " );
-      fields.add( new FieldDef( halves[1], halves[0] ) );
+    String parts[] = cols.split(", ");
+    List<FieldDef> fields = new ArrayList<>();
+    for (String part : parts) {
+      String halves[] = part.split(" ");
+      fields.add(new FieldDef(halves[1], halves[0]));
     }
     return fields;
   }
 
   private void parseFragProfiles() {
-    JsonArray frags = getFragmentProfile( );
+    JsonArray frags = getFragmentProfile();
     for (JsonObject fragProfile : frags.getValuesAs(JsonObject.class)) {
       int mId = fragProfile.getInt("majorFragmentId");
       FragInfo major = fragments.get(mId);
@@ -209,13 +237,16 @@ public class ProfileParser {
 
   private void aggregateOpers() {
     for (FragInfo major : fragments.values()) {
-      for (OpDefInfo opDef : major.ops) {
+      for (OperatorSummary opDef : major.ops) {
         int sumPeak = 0;
-        for ( OperatorProfile op : opDef.opExecs) {
-          Preconditions.checkState( major.id == op.majorFragId );
-          Preconditions.checkState( opDef.stepId == op.opId );
+        opDef.execCount = opDef.opExecs.size();
+        for (OperatorProfile op : opDef.opExecs) {
+          Preconditions.checkState(major.id == op.majorFragId);
+          Preconditions.checkState(opDef.stepId == op.opId);
           opDef.actualRows += op.records;
           opDef.actualBatches += op.batches;
+          opDef.setupMs += op.setupMs;
+          opDef.processMs += op.processMs;
           sumPeak += op.peakMem;
         }
         opDef.actualMemory = sumPeak * 1024 * 1024;
@@ -229,11 +260,11 @@ public class ProfileParser {
 
   public void buildTree() {
     int currentLevel = 0;
-    OpDefInfo opStack[] = new OpDefInfo[topoOrder.size()];
-    for (OpDefInfo opDef : topoOrder) {
+    OperatorSummary opStack[] = new OperatorSummary[topoOrder.size()];
+    for (OperatorSummary opDef : topoOrder) {
       currentLevel = opDef.globalLevel;
       opStack[currentLevel] = opDef;
-      if ( opDef.inferredParent == null ) {
+      if (opDef.inferredParent == null) {
         if (currentLevel > 0) {
           opStack[currentLevel-1].children.add(opDef);
         }
@@ -244,7 +275,7 @@ public class ProfileParser {
   }
 
 
-  public String getQuery( ) {
+  public String getQuery() {
     return profile.getString("query");
   }
 
@@ -256,40 +287,40 @@ public class ProfileParser {
     return plans;
   }
 
-  public List<String> getScans( ) {
+  public List<String> getScans() {
     List<String> scans = new ArrayList<>();
-    int n = getPlans( ).size();
-    for ( int i = n-1; i >= 0;  i-- ) {
-      String plan = plans.get( i );
-      if ( plan.contains( " Scan(" ) ) {
-        scans.add( plan );
+    int n = getPlans().size();
+    for (int i = n-1; i >= 0;  i--) {
+      String plan = plans.get(i);
+      if (plan.contains(" Scan(")) {
+        scans.add(plan);
       }
     }
     return scans;
   }
 
-  public List<FieldDef> getColumns( String plan ) {
-    Pattern p = Pattern.compile( "RecordType\\((.*)\\):" );
+  public List<FieldDef> getColumns(String plan) {
+    Pattern p = Pattern.compile("RecordType\\((.*)\\):");
     Matcher m = p.matcher(plan);
-    if ( ! m.find() ) { return null; }
+    if (! m.find()) { return null; }
     String frag = m.group(1);
-    String parts[] = frag.split( ", " );
-    List<FieldDef> fields = new ArrayList<>( );
-    for ( String part : parts ) {
-      String halves[] = part.split( " " );
-      fields.add( new FieldDef( halves[1], halves[0] ) );
+    String parts[] = frag.split(", ");
+    List<FieldDef> fields = new ArrayList<>();
+    for (String part : parts) {
+      String halves[] = part.split(" ");
+      fields.add(new FieldDef(halves[1], halves[0]));
     }
     return fields;
   }
 
-  public Map<Integer,String> getOperators( ) {
+  public Map<Integer,String> getOperators() {
     Map<Integer,String> ops = new HashMap<>();
-    int n = getPlans( ).size();
-    Pattern p = Pattern.compile( "\\d+-(\\d+)\\s+(\\w+)" );
-    for ( int i = n-1; i >= 0;  i-- ) {
-      String plan = plans.get( i );
-      Matcher m = p.matcher( plan );
-      if ( ! m.find() ) { continue; }
+    int n = getPlans().size();
+    Pattern p = Pattern.compile("\\d+-(\\d+)\\s+(\\w+)");
+    for (int i = n-1; i >= 0;  i--) {
+      String plan = plans.get(i);
+      Matcher m = p.matcher(plan);
+      if (! m.find()) { continue; }
       int index = Integer.parseInt(m.group(1));
       String op = m.group(2);
       ops.put(index,op);
@@ -297,7 +328,7 @@ public class ProfileParser {
     return ops;
   }
 
-  public JsonArray getFragmentProfile( ) {
+  public JsonArray getFragmentProfile() {
     return profile.getJsonArray("fragmentProfile");
   }
 
@@ -309,21 +340,21 @@ public class ProfileParser {
   public static class FragInfo {
     public int baseLevel;
     public int id;
-    public List<OpDefInfo> ops = new ArrayList<>( );
-    public List<MinorFragInfo> minors = new ArrayList<>( );
+    public List<OperatorSummary> ops = new ArrayList<>();
+    public List<MinorFragInfo> minors = new ArrayList<>();
 
     public FragInfo(int majorId) {
       this.id = majorId;
     }
 
-    public OpDefInfo getRootOperator() {
+    public OperatorSummary getRootOperator() {
       return ops.get(0);
     }
 
     public void parse(JsonObject fragProfile) {
       JsonArray minorList = fragProfile.getJsonArray("minorFragmentProfile");
-      for ( JsonObject minorProfile : minorList.getValuesAs(JsonObject.class) ) {
-        minors.add( new MinorFragInfo(id, minorProfile) );
+      for (JsonObject minorProfile : minorList.getValuesAs(JsonObject.class)) {
+        minors.add(new MinorFragInfo(id, minorProfile));
       }
     }
   }
@@ -335,26 +366,35 @@ public class ProfileParser {
   public static class MinorFragInfo {
     public final int majorId;
     public final int id;
-    public final List<OperatorProfile> ops = new ArrayList<>( );
+    public final List<OperatorProfile> ops = new ArrayList<>();
 
     public MinorFragInfo(int majorId, JsonObject minorProfile) {
       this.majorId = majorId;
       id = minorProfile.getInt("minorFragmentId");
       JsonArray opList = minorProfile.getJsonArray("operatorProfile");
-      for ( JsonObject opProfile : opList.getValuesAs(JsonObject.class)) {
-        ops.add( new OperatorProfile( majorId, id, opProfile) );
+      for (JsonObject opProfile : opList.getValuesAs(JsonObject.class)) {
+        ops.add(new OperatorProfile(majorId, id, opProfile));
       }
     }
 
+    /**
+     * Map each operator execution profiles back to the definition of that
+     * operator. The only common key is the xx-yy value where xx is the fragment
+     * number and yy is the operator ID.
+     *
+     * @param major major fragment that corresponds to the xx portion of the
+     * operator id
+     */
+
     public void mapOpProfiles(FragInfo major) {
       for (OperatorProfile op : ops) {
-        OpDefInfo opDef = major.ops.get(op.opId);
-        if ( opDef == null ) {
-          System.out.println( "Can't find operator def: " + major.id + "-" + op.opId);
+        OperatorSummary opDef = major.ops.get(op.opId);
+        if (opDef == null) {
+          System.out.println("Can't find operator def: " + major.id + "-" + op.opId);
           continue;
         }
         op.opName = CoreOperatorType.valueOf(op.type).name();
-//        System.out.println( major.id + "-" + id + "-" + opDef.stepId + " - Def: " + opDef.name + " / Prof: " + op.opName );
+//        System.out.println(major.id + "-" + id + "-" + opDef.stepId + " - Def: " + opDef.name + " / Prof: " + op.opName);
         op.opName = op.opName.replace("_", " ");
         op.name = opDef.name;
         if (op.name.equalsIgnoreCase(op.opName)) {
@@ -362,10 +402,10 @@ public class ProfileParser {
         }
         op.defn = opDef;
         opDef.opName = op.opName;
+        opDef.type = op.type;
         opDef.opExecs.add(op);
       }
     }
-
   }
 
   /**
@@ -375,7 +415,7 @@ public class ProfileParser {
    */
 
   public static class OperatorProfile {
-    public OpDefInfo defn;
+    public OperatorSummary defn;
     public String opName;
     public int majorFragId;
     public int minorFragId;
@@ -424,6 +464,13 @@ public class ProfileParser {
         return 0; }
       return ((JsonNumber) value).longValue();
     }
+
+    @Override
+    public String toString() {
+      return String.format("[OperatorProfile %02d-%02d-%02d, type: %d, name: %s]",
+          majorFragId, opId, minorFragId, type,
+          (name == null) ? "null" : name);
+    }
   }
 
   /**
@@ -432,9 +479,18 @@ public class ProfileParser {
    * "actuals" from the minor fragment portion of the profile.
    * Allows integrating the "planned" vs. "actual" performance of the
    * query.
+   * <p>
+   * There is one operator definition (represented here), each of which may
+   * give rise to multiple operator executions (housed in minor fragments.)
+   * The {@link #opExecs} field provides the list of operator executions
+   * (which provides access to operator metrics.)
    */
 
-  public static class OpDefInfo {
+  public static class OperatorSummary {
+    public int type;
+    public long processMs;
+    public long setupMs;
+    public int execCount;
     public String opName;
     public boolean isInferred;
     public int majorId;
@@ -456,9 +512,9 @@ public class ProfileParser {
     public long actualMemory;
     public int actualBatches;
     public long actualRows;
-    public OpDefInfo inferredParent;
-    public List<OperatorProfile> opExecs = new ArrayList<>( );
-    public List<OpDefInfo> children = new ArrayList<>( );
+    public OperatorSummary inferredParent;
+    public List<OperatorProfile> opExecs = new ArrayList<>();
+    public List<OperatorSummary> children = new ArrayList<>();
 
     // 00-00    Screen : rowType = RecordType(VARCHAR(10) Year, VARCHAR(65536) Month, VARCHAR(100) Devices, VARCHAR(100) Tier, VARCHAR(100) LOB, CHAR(10) Gateway, BIGINT Day, BIGINT Hour, INTEGER Week, VARCHAR(100) Week_end_date, BIGINT Usage_Cnt): \
     // rowcount = 100.0, cumulative cost = {7.42124276972414E9 rows, 7.663067406383167E10 cpu, 0.0 io, 2.24645048816E10 network, 2.692766612982188E8 memory}, id = 129302
@@ -466,11 +522,11 @@ public class ProfileParser {
     // 00-01      Project(Year=[$0], Month=[$1], Devices=[$2], Tier=[$3], LOB=[$4], Gateway=[$5], Day=[$6], Hour=[$7], Week=[$8], Week_end_date=[$9], Usage_Cnt=[$10]) :
     // rowType = RecordType(VARCHAR(10) Year, VARCHAR(65536) Month, VARCHAR(100) Devices, VARCHAR(100) Tier, VARCHAR(100) LOB, CHAR(10) Gateway, BIGINT Day, BIGINT Hour, INTEGER Week, VARCHAR(100) Week_end_date, BIGINT Usage_Cnt): rowcount = 100.0, cumulative cost = {7.42124275972414E9 rows, 7.663067405383167E10 cpu, 0.0 io, 2.24645048816E10 network, 2.692766612982188E8 memory}, id = 129301
 
-    public OpDefInfo(String plan) {
-      Pattern p = Pattern.compile( "^(\\d+)-(\\d+)(\\s+)(\\w+)(?:\\((.*)\\))?\\s*:\\s*(.*)$" );
+    public OperatorSummary(String plan) {
+      Pattern p = Pattern.compile("^(\\d+)-(\\d+)(\\s+)(\\w+)(?:\\((.*)\\))?\\s*:\\s*(.*)$");
       Matcher m = p.matcher(plan);
       if (!m.matches()) {
-        throw new IllegalStateException( "Could not parse plan: " + plan );
+        throw new IllegalStateException("Could not parse plan: " + plan);
       }
       majorId = Integer.parseInt(m.group(1));
       stepId = Integer.parseInt(m.group(2));
@@ -482,15 +538,15 @@ public class ProfileParser {
 
       p = Pattern.compile("rowType = RecordType\\((.*)\\): (rowcount .*)");
       m = p.matcher(tail);
-      if ( m.matches() ) {
+      if (m.matches()) {
         columns = parseCols(m.group(1));
         tail = m.group(2);
       }
 
-      p = Pattern.compile( "rowcount = ([\\d.E]+), cumulative cost = \\{([\\d.E]+) rows, ([\\d.E]+) cpu, ([\\d.E]+) io, ([\\d.E]+) network, ([\\d.E]+) memory\\}, id = (\\d+)");
+      p = Pattern.compile("rowcount = ([\\d.E]+), cumulative cost = \\{([\\d.E]+) rows, ([\\d.E]+) cpu, ([\\d.E]+) io, ([\\d.E]+) network, ([\\d.E]+) memory\\}, id = (\\d+)");
       m = p.matcher(tail);
       if (! m.matches()) {
-        throw new IllegalStateException("Could not parse costs: " + tail );
+        throw new IllegalStateException("Could not parse costs: " + tail);
       }
       estRows = Double.parseDouble(m.group(1));
       estRowCost = Double.parseDouble(m.group(2));
@@ -505,7 +561,7 @@ public class ProfileParser {
       new TreePrinter().visit(this);
     }
 
-    public OpDefInfo(int major, int id) {
+    public OperatorSummary(int major, int id) {
       majorId = major;
       stepId = id;
     }
@@ -513,7 +569,7 @@ public class ProfileParser {
     @Override
     public String toString() {
       String head = "[OpDefInfo " + majorId + "-" + stepId + ": " + name;
-      if ( isInferred ) {
+      if (isInferred) {
         head += " (" + opName + ")";
       }
       return head + "]";
@@ -527,36 +583,37 @@ public class ProfileParser {
 
   public static class TreeVisitor
   {
-    public void visit(OpDefInfo root) {
+    public void visit(OperatorSummary root) {
       visit(root, 0);
     }
-    public void visit(OpDefInfo node, int indent) {
-      visitOp( node, indent );
+
+    public void visit(OperatorSummary node, int indent) {
+      visitOp(node, indent);
       if (node.children.isEmpty()) {
         return;
       }
-      if ( node.children.size() == 1) {
+      if (node.children.size() == 1) {
         visit(node.children.get(0), indent);
         return;
       }
       indent++;
       int i = 0;
-      for (OpDefInfo child : node.children) {
+      for (OperatorSummary child : node.children) {
         visitSubtree(node, i++, indent);
         visit(child, indent+1);
       }
     }
 
-    protected void visitOp(OpDefInfo node, int indent) {
+    protected void visitOp(OperatorSummary node, int indent) {
     }
 
-    protected void visitSubtree(OpDefInfo node, int i, int indent) {
+    protected void visitSubtree(OperatorSummary node, int i, int indent) {
     }
 
     public String indentString(int indent, String pad) {
       StringBuilder buf = new StringBuilder();
       for (int i = 0; i < indent; i++) {
-        buf.append( pad );
+        buf.append(pad);
       }
       return buf.toString();
     }
@@ -565,7 +622,7 @@ public class ProfileParser {
       return indentString(indent, "  ");
     }
 
-    public String subtreeLabel(OpDefInfo node, int branch) {
+    public String subtreeLabel(OperatorSummary node, int branch) {
       if (node.name.equals("HashJoin")) {
         return (branch == 0) ? "Probe" : "Build";
       } else {
@@ -581,14 +638,14 @@ public class ProfileParser {
   public static class TreePrinter extends TreeVisitor
   {
     @Override
-    protected void visitOp(OpDefInfo node, int indent) {
-      System.out.print( indentString(indent) );
-      System.out.println( node.toString() );
+    protected void visitOp(OperatorSummary node, int indent) {
+      System.out.print(indentString(indent));
+      System.out.println(node.toString());
     }
 
     @Override
-    protected void visitSubtree(OpDefInfo node, int i, int indent) {
-      System.out.print( indentString(indent) );
+    protected void visitSubtree(OperatorSummary node, int i, int indent) {
+      System.out.print(indentString(indent));
       System.out.println(subtreeLabel(node, i));
     }
   }
@@ -610,30 +667,50 @@ public class ProfileParser {
   public static class CostPrinter extends TreeVisitor
   {
     @Override
-    protected void visitOp(OpDefInfo node, int indentLevel) {
+    protected void visitOp(OperatorSummary node, int indentLevel) {
       System.out.print(String.format("%02d-%02d ", node.majorId, node.stepId));
       String indent = indentString(indentLevel, ". ");
-      System.out.print( indent + node.name );
+      System.out.print(indent + node.name);
       if (node.opName != null) {
-        System.out.print( " (" + node.opName + ")" );
+        System.out.print(" (" + node.opName + ")");
       }
-      System.out.println( );
+      System.out.println();
       indent = indentString(15);
-      System.out.print( indent );
+      System.out.print(indent);
       System.out.println(String.format("  Estimate: %,15.0f rows, %,7.0f MB",
-                         node.estRows, node.estMemoryCost / 1024 / 1024) );
-      System.out.print( indent );
+                         node.estRows, node.estMemoryCost / 1024 / 1024));
+      System.out.print(indent);
       System.out.println(String.format("  Actual:   %,15d rows, %,7d MB",
                          node.actualRows, node.actualMemory / 1024 / 1024));
     }
 
     @Override
-    protected void visitSubtree(OpDefInfo node, int i, int indent) {
-      System.out.print( indentString(indent) + "      " );
+    protected void visitSubtree(OperatorSummary node, int i, int indent) {
+      System.out.print(indentString(indent) + "      ");
       System.out.println(subtreeLabel(node, i));
     }
   }
 
+  public static class FindOpVisitor extends TreeVisitor
+  {
+    private List<OperatorSummary> ops;
+    private int type;
+
+    public List<OperatorSummary> find(int type, OperatorSummary node) {
+      ops = new ArrayList<>();
+      this.type = type;
+      visit(node);
+      return ops;
+    }
+
+    @Override
+    protected void visitOp(OperatorSummary node, int indentLevel) {
+      if (node.type == type) {
+        ops.add(node);
+      }
+    }
+  }
+
   /**
    * We often run test queries single threaded to make analysis of the profile
    * easier. For a single-threaded (single slice) query, get a map from
@@ -643,14 +720,14 @@ public class ProfileParser {
    * @return
    */
 
-  public Map<Integer,OperatorProfile> getOpInfo( ) {
-    Map<Integer,String> ops = getOperators( );
-    Map<Integer,OperatorProfile> info = new HashMap<>( );
-    JsonArray frags = getFragmentProfile( );
+  public Map<Integer,OperatorProfile> getOpInfo() {
+    Map<Integer,String> ops = getOperators();
+    Map<Integer,OperatorProfile> info = new HashMap<>();
+    JsonArray frags = getFragmentProfile();
     JsonObject fragProfile = frags.getJsonObject(0).getJsonArray("minorFragmentProfile").getJsonObject(0);
     JsonArray opList = fragProfile.getJsonArray("operatorProfile");
-    for ( JsonObject opProfile : opList.getValuesAs(JsonObject.class) ) {
-      parseOpProfile( ops, info, opProfile );
+    for (JsonObject opProfile : opList.getValuesAs(JsonObject.class)) {
+      parseOpProfile(ops, info, opProfile);
     }
     return info;
   }
@@ -665,24 +742,123 @@ public class ProfileParser {
 
   public List<OperatorProfile> getOpsOfType(int type) {
     List<OperatorProfile> ops = new ArrayList<>();
-    Map<Integer,OperatorProfile> opMap = getOpInfo();
-    for (OperatorProfile op : opMap.values()) {
-      if (op.type == type) {
-        ops.add(op);
-      }
+    List<OperatorSummary> opDefs = getOpDefsOfType(type);
+    for (OperatorSummary opDef : opDefs) {
+      ops.addAll(opDef.opExecs);
     }
     return ops;
   }
 
+  public List<OperatorSummary> getOpDefsOfType(int type) {
+    return new FindOpVisitor().find(type, topoOrder.get(0));
+  }
+
   private void parseOpProfile(Map<Integer, String> ops,
       Map<Integer, OperatorProfile> info, JsonObject opProfile) {
-    OperatorProfile opInfo = new OperatorProfile( 0, 0, opProfile );
+    OperatorProfile opInfo = new OperatorProfile(0, 0, opProfile);
     opInfo.name = ops.get(opInfo.opId);
     info.put(opInfo.opId, opInfo);
   }
 
   public void printPlan() {
-    new CostPrinter().visit( topoOrder.get(0) );
+    new CostPrinter().visit(topoOrder.get(0));
+  }
+
+  public void printTime() {
+    new TimePrinter().visit(topoOrder.get(0));
+  }
+
+  public static class Aggregator extends TreeVisitor
+  {
+    protected int n;
+    protected long totalSetup;
+    protected long totalProcess;
+    protected long total;
+    protected int maxFrag;
+    protected boolean isTree;
+
+    @Override
+    public void visit(OperatorSummary root) {
+      super.visit(root, 0);
+      total = totalSetup + totalProcess;
+    }
+
+    @Override
+    protected void visitOp(OperatorSummary node, int indentLevel) {
+      n++;
+      totalSetup += node.setupMs;
+      totalProcess += node.processMs;
+      maxFrag = Math.max(maxFrag, node.majorId);
+      isTree |= (node.children.size() > 1);
+    }
+  }
+
+  public static class TimePrinter extends TreeVisitor
+  {
+    private Aggregator totals;
+    private boolean singleThread;
+    private boolean singleFragment;
+
+    @Override
+    public void visit(OperatorSummary root) {
+      totals = new Aggregator();
+      totals.visit(root);
+      singleThread = ! totals.isTree;
+      singleFragment = (totals.maxFrag == 0);
+      super.visit(root, 0);
+      System.out.println("Total:");
+      String indent = singleThread? "  " : indentString(15);
+      System.out.print(indent);
+      System.out.println(String.format("Setup:   %,6d ms", totals.totalSetup));
+      System.out.print(indent);
+      System.out.println(String.format("Process: %,6d ms", totals.totalProcess));
+    }
+
+    @Override
+    protected void visitOp(OperatorSummary node, int indentLevel) {
+      if (singleThread) {
+        printSimpleFormat(node);
+      } else {
+        printTreeFormat(node, indentLevel);
+      }
+    }
+
+    private void printSimpleFormat(OperatorSummary node) {
+      if (singleFragment) {
+        System.out.print(String.format("%02d ", node.stepId));
+      } else {
+        System.out.print(String.format("%02d-%02d ", node.majorId, node.stepId));
+      }
+      System.out.print(node.name);
+      if (node.opName != null) {
+        System.out.print(" (" + node.opName + ")");
+      }
+      System.out.println();
+      printTimes(node, "  ");
+    }
+
+    private void printTimes(OperatorSummary node, String indent) {
+      System.out.print(indent);
+      System.out.println(String.format("Setup:   %,6d ms - %3d%%, %3d%%", node.setupMs,
+                         percent(node.setupMs, totals.totalSetup),
+                         percent(node.setupMs, totals.total)));
+      System.out.print(indent);
+      System.out.println(String.format("Process: %,6d ms - %3d%%, %3d%%", node.processMs,
+                         percent(node.processMs, totals.totalProcess),
+                         percent(node.processMs, totals.total)));
+    }
+
+    private void printTreeFormat(OperatorSummary node, int indentLevel) {
+      System.out.print(String.format("%02d-%02d ", node.majorId, node.stepId));
+      String indent = indentString(indentLevel, ". ");
+      System.out.print(indent + node.name);
+      if (node.opName != null) {
+        System.out.print(" (" + node.opName + ")");
+      }
+      System.out.println();
+      indent = indentString(15);
+      printTimes(node, indent);
+    }
   }
 
   /**
@@ -692,56 +868,60 @@ public class ProfileParser {
    */
 
   public void print() {
+    printTime();
+  }
+
+  public void simplePrint() {
     Map<Integer, OperatorProfile> opInfo = getOpInfo();
     int n = opInfo.size();
     long totalSetup = 0;
     long totalProcess = 0;
-    for ( int i = 0;  i <= n;  i++ ) {
+    for (int i = 0;  i <= n;  i++) {
       OperatorProfile op = opInfo.get(i);
-      if ( op == null ) { continue; }
+      if (op == null) { continue; }
       totalSetup += op.setupMs;
       totalProcess += op.processMs;
     }
     long total = totalSetup + totalProcess;
-    for ( int i = 0;  i <= n;  i++ ) {
+    for (int i = 0;  i <= n;  i++) {
       OperatorProfile op = opInfo.get(i);
-      if ( op == null ) { continue; }
-      System.out.print( "Op: " );
-      System.out.print( op.opId );
-      System.out.println( " " + op.name );
-      System.out.print( "  Setup:   " + op.setupMs );
-      System.out.print( " - " + percent(op.setupMs, totalSetup ) + "%" );
-      System.out.println( ", " + percent(op.setupMs, total ) + "%" );
-      System.out.print( "  Process: " + op.processMs );
-      System.out.print( " - " + percent(op.processMs, totalProcess ) + "%" );
-      System.out.println( ", " + percent(op.processMs, total ) + "%" );
+      if (op == null) { continue; }
+      System.out.print("Op: ");
+      System.out.print(op.opId);
+      System.out.println(" " + op.name);
+      System.out.print("  Setup:   " + op.setupMs);
+      System.out.print(" - " + percent(op.setupMs, totalSetup) + "%");
+      System.out.println(", " + percent(op.setupMs, total) + "%");
+      System.out.print("  Process: " + op.processMs);
+      System.out.print(" - " + percent(op.processMs, totalProcess) + "%");
+      System.out.println(", " + percent(op.processMs, total) + "%");
       if (op.type == 17) {
         long value = op.getMetric(0);
-        System.out.println( "  Spills: " + value );
+        System.out.println("  Spills: " + value);
       }
       if (op.waitMs > 0) {
-        System.out.println( "  Wait:    " + op.waitMs );
+        System.out.println("  Wait:    " + op.waitMs);
       }
-      if ( op.peakMem > 0) {
-        System.out.println( "  Memory: " + op.peakMem );
+      if (op.peakMem > 0) {
+        System.out.println("  Memory: " + op.peakMem);
       }
     }
-    System.out.println( "Total:" );
-    System.out.println( "  Setup:   " + totalSetup );
-    System.out.println( "  Process: " + totalProcess );
+    System.out.println("Total:");
+    System.out.println("  Setup:   " + totalSetup);
+    System.out.println("  Process: " + totalProcess);
   }
 
-  public static long percent( long value, long total ) {
-    if ( total == 0 ) {
+  public static long percent(long value, long total) {
+    if (total == 0) {
       return 0; }
-    return Math.round(value * 100 / total );
+    return Math.round(value * 100 / total);
   }
 
-  public List<OpDefInfo> getOpDefn(String target) {
-    List<OpDefInfo> ops = new ArrayList<>( );
-    for ( OpDefInfo opDef : operations ) {
-      if ( opDef.name.startsWith( target ) ) {
-        ops.add( opDef );
+  public List<OperatorSummary> getOpDefn(String target) {
+    List<OperatorSummary> ops = new ArrayList<>();
+    for (OperatorSummary opDef : operations) {
+      if (opDef.name.startsWith(target)) {
+        ops.add(opDef);
       }
     }
     return ops;


[5/7] drill git commit: DRILL-5349: Fix TestParquetWriter unit tests when synchronous parquet reader is used.

Posted by am...@apache.org.
DRILL-5349: Fix TestParquetWriter unit tests when synchronous parquet reader is used.

close apache/drill#780


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1766ffc4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1766ffc4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1766ffc4

Branch: refs/heads/master
Commit: 1766ffc4960e8f7c1efc981a9302688a8c6cd427
Parents: 2f13c08
Author: Parth Chandra <pc...@maprtech.com>
Authored: Fri Mar 10 14:38:30 2017 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Mon Mar 20 08:11:30 2017 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/store/parquet/columnreaders/PageReader.java | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/1766ffc4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index e11fd65..8a783c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -254,6 +254,9 @@ class PageReader {
           this.parentColumnReader.parentReader.hadoopPath,
           this.parentColumnReader.columnDescriptor.toString(), start, 0, 0, timeToRead);
       timer.reset();
+      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
+        readDictionaryPage(pageHeader, parentColumnReader);
+      }
     } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
 
     int compressedSize = pageHeader.getCompressed_page_size();


[2/7] drill git commit: DRILL-5311: Check handshake result in C++ connector

Posted by am...@apache.org.
DRILL-5311: Check handshake result in C++ connector

In C++ client connector, DrillClientImpl::recvHandshake always
return success, even in case of connection error (like a tcp
timeout issue). Only on WIN32 platform would the error code be
checked.

Remove the restriction to only check on WIN32, plus add some logging.

close apache/drill#770


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/97e2a1d1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/97e2a1d1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/97e2a1d1

Branch: refs/heads/master
Commit: 97e2a1d1d79ae963e8e67067241398580fee7e14
Parents: 8656c83
Author: Laurent Goujon <la...@dremio.com>
Authored: Thu Mar 2 20:38:05 2017 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Mon Mar 20 08:06:14 2017 -0700

----------------------------------------------------------------------
 contrib/native/client/src/clientlib/drillClientImpl.cpp | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/97e2a1d1/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index d768bf3..a41cb4b 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -274,11 +274,13 @@ connectionStatus_t DrillClientImpl::recvHandshake(){
     if(m_rbuf!=NULL){
         Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
     }
-#ifdef WIN32_SHUTDOWN_ON_TIMEOUT
+
     if (m_pError != NULL) {
+        DRILL_MT_LOG(DRILL_LOG(LOG_ERROR) << "DrillClientImpl::recvHandshake: failed to complete handshake with server."
+                    << m_pError->msg << "\n";)
         return static_cast<connectionStatus_t>(m_pError->status);
     }
-#endif // WIN32_SHUTDOWN_ON_TIMEOUT
+
     startHeartbeatTimer();
 
     return CONN_SUCCESS;


[7/7] drill git commit: DRILL-5359: Fix ClassCastException when Drill pushes down filter on the output of flatten operator.

Posted by am...@apache.org.
DRILL-5359: Fix ClassCastException when Drill pushes down filter on the output of flatten operator.

- Move findItemOrFlatten as a static method in DrillRelOptUtil.
- Exclude filter conditions if they contain item/flatten operator.

close apache/drill#786


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/adbf363d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/adbf363d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/adbf363d

Branch: refs/heads/master
Commit: adbf363d286d548b324b55aa167b333addad8441
Parents: a8046be
Author: Jinfeng Ni <jn...@apache.org>
Authored: Thu Mar 16 14:44:35 2017 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Mon Mar 20 08:14:24 2017 -0700

----------------------------------------------------------------------
 .../exec/planner/common/DrillRelOptUtil.java    | 54 +++++++++++++++++++
 .../logical/DrillPushFilterPastProjectRule.java | 55 +++-----------------
 .../store/parquet/ParquetPushDownFilter.java    | 24 ++++++++-
 .../parquet/TestParquetFilterPushDown.java      | 14 +++++
 4 files changed, 98 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/adbf363d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index 733577e..b3e261c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -29,11 +29,15 @@ import org.apache.calcite.rel.rules.ProjectRemoveRule;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexVisitorImpl;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.resolver.TypeCastRules;
@@ -169,4 +173,54 @@ public abstract class DrillRelOptUtil {
     }
     return true;
   }
+
+  /**
+   * Travesal RexNode to find the item/flattern operator. Continue search if RexNode has a
+   * RexInputRef which refers to a RexNode in project expressions.
+   *
+   * @param node : RexNode to search
+   * @param projExprs : the list of project expressions. Empty list means there is No project operator underneath.
+   * @return : Return null if there is NONE; return the first appearance of item/flatten RexCall.
+   */
+  public static RexCall findItemOrFlatten(
+      final RexNode node,
+      final List<RexNode> projExprs) {
+    try {
+      RexVisitor<Void> visitor =
+          new RexVisitorImpl<Void>(true) {
+            public Void visitCall(RexCall call) {
+              if ("item".equals(call.getOperator().getName().toLowerCase()) ||
+                  "flatten".equals(call.getOperator().getName().toLowerCase())) {
+                throw new Util.FoundOne(call); /* throw exception to interrupt tree walk (this is similar to
+                                              other utility methods in RexUtil.java */
+              }
+              return super.visitCall(call);
+            }
+
+            public Void visitInputRef(RexInputRef inputRef) {
+              if (projExprs.size() == 0 ) {
+                return super.visitInputRef(inputRef);
+              } else {
+                final int index = inputRef.getIndex();
+                RexNode n = projExprs.get(index);
+                if (n instanceof RexCall) {
+                  RexCall r = (RexCall) n;
+                  if ("item".equals(r.getOperator().getName().toLowerCase()) ||
+                      "flatten".equals(r.getOperator().getName().toLowerCase())) {
+                    throw new Util.FoundOne(r);
+                  }
+                }
+
+                return super.visitInputRef(inputRef);
+              }
+            }
+          };
+      node.accept(visitor);
+      return null;
+    } catch (Util.FoundOne e) {
+      Util.swallow(e, null);
+      return (RexCall) e.getNode();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/adbf363d/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
index 6591bfd..c2dbfb9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushFilterPastProjectRule.java
@@ -17,65 +17,24 @@
  */
 package org.apache.drill.exec.planner.logical;
 
-import java.util.List;
-
 import com.google.common.collect.Lists;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.rex.RexVisitor;
-import org.apache.calcite.rex.RexVisitorImpl;
-import org.apache.calcite.util.Util;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+
+import java.util.List;
 
 public class DrillPushFilterPastProjectRule extends RelOptRule {
 
   public final static RelOptRule INSTANCE = new DrillPushFilterPastProjectRule();
 
-  private RexCall findItemOrFlatten(
-      final RexNode node,
-      final List<RexNode> projExprs) {
-    try {
-      RexVisitor<Void> visitor =
-          new RexVisitorImpl<Void>(true) {
-        public Void visitCall(RexCall call) {
-          if ("item".equals(call.getOperator().getName().toLowerCase()) ||
-            "flatten".equals(call.getOperator().getName().toLowerCase())) {
-            throw new Util.FoundOne(call); /* throw exception to interrupt tree walk (this is similar to
-                                              other utility methods in RexUtil.java */
-          }
-          return super.visitCall(call);
-        }
-
-        public Void visitInputRef(RexInputRef inputRef) {
-          final int index = inputRef.getIndex();
-          RexNode n = projExprs.get(index);
-          if (n instanceof RexCall) {
-            RexCall r = (RexCall) n;
-            if ("item".equals(r.getOperator().getName().toLowerCase()) ||
-                "flatten".equals(r.getOperator().getName().toLowerCase())) {
-              throw new Util.FoundOne(r);
-            }
-          }
-
-          return super.visitInputRef(inputRef);
-        }
-      };
-      node.accept(visitor);
-      return null;
-    } catch (Util.FoundOne e) {
-      Util.swallow(e, null);
-      return (RexCall) e.getNode();
-    }
-  }
-
   protected DrillPushFilterPastProjectRule() {
     super(
         operand(
@@ -99,7 +58,7 @@ public class DrillPushFilterPastProjectRule extends RelOptRule {
 
 
     for (final RexNode pred : predList) {
-      if (findItemOrFlatten(pred, projRel.getProjects()) == null) {
+      if (DrillRelOptUtil.findItemOrFlatten(pred, projRel.getProjects()) == null) {
         qualifiedPredList.add(pred);
       } else {
         unqualifiedPredList.add(pred);

http://git-wip-us.apache.org/repos/asf/drill/blob/adbf363d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index 6f870f7..1ec10d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -17,16 +17,19 @@ package org.apache.drill.exec.store.parquet;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
@@ -36,6 +39,7 @@ import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
@@ -116,8 +120,26 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
       return;
     }
 
+    // get a conjunctions of the filter condition. For each conjunction, if it refers to ITEM or FLATTEN expression
+    // then we could not pushed down. Otherwise, it's qualified to be pushed down.
+    final List<RexNode> predList = RelOptUtil.conjunctions(condition);
+
+    final List<RexNode> qualifiedPredList = Lists.newArrayList();
+
+    for (final RexNode pred : predList) {
+      if (DrillRelOptUtil.findItemOrFlatten(pred, ImmutableList.<RexNode>of()) == null) {
+        qualifiedPredList.add(pred);
+      }
+    }
+
+    final RexNode qualifedPred = RexUtil.composeConjunction(filter.getCluster().getRexBuilder(), qualifiedPredList, true);
+
+    if (qualifedPred == null) {
+      return;
+    }
+
     LogicalExpression conditionExp = DrillOptiq.toDrill(
-        new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
+        new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, qualifedPred);
 
     Stopwatch timer = Stopwatch.createStarted();
     final GroupScan newGroupScan = groupScan.applyFilter(conditionExp,optimizerContext,

http://git-wip-us.apache.org/repos/asf/drill/blob/adbf363d/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index 1ad000e..782973d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -366,6 +366,20 @@ public class TestParquetFilterPushDown extends PlanTestBase {
     testParquetFilterPD(query3, 49, 2, false);
   }
 
+  @Test // DRILL-5359
+  public void testFilterWithItemFlatten() throws  Exception {
+    final String sql = "select n_regionkey\n"
+        + "from (select n_regionkey, \n"
+        + "            flatten(nation.cities) as cities \n"
+        + "      from cp.`tpch/nation.parquet` nation) as flattenedCities \n"
+        + "where flattenedCities.cities.`zip` = '12345'";
+
+    final String[] expectedPlan = {"(?s)Filter.*Flatten"};
+    final String[] excludedPlan = {};
+
+    PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
+
+  }
 
   //////////////////////////////////////////////////////////////////////////////////////////////////
   // Some test helper functions.