You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/07/21 17:23:37 UTC

[1/5] drill git commit: DRILL-5616: Add memory checks, plus minor metrics changes

Repository: drill
Updated Branches:
  refs/heads/master 07346c782 -> a0c178bab


DRILL-5616: Add memory checks, plus minor metrics changes

closes #871


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/35a7ad00
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/35a7ad00
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/35a7ad00

Branch: refs/heads/master
Commit: 35a7ad00258ed4db2602b813c0ce35e34563725d
Parents: 07346c7
Author: Boaz Ben-Zvi <bo...@BBenZvi-E754-MBP13.local>
Authored: Mon Jul 10 17:53:28 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri Jul 21 16:32:05 2017 +0300

----------------------------------------------------------------------
 .../impl/aggregate/HashAggTemplate.java         | 116 +++++++++++--------
 .../exec/physical/impl/common/HashTable.java    |   6 +-
 .../physical/impl/common/HashTableTemplate.java |  42 +++----
 .../exec/physical/impl/join/HashJoinBatch.java  |   4 +-
 4 files changed, 81 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/35a7ad00/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 8393937..9f2c2fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -29,8 +29,6 @@ import javax.inject.Named;
 import com.google.common.base.Stopwatch;
 
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -176,7 +174,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     NUM_BUCKETS,
     NUM_ENTRIES,
     NUM_RESIZING,
-    RESIZING_TIME,
+    RESIZING_TIME_MS,
     NUM_PARTITIONS,
     SPILLED_PARTITIONS, // number of partitions spilled to disk
     SPILL_MB,         // Number of MB of data spilled to disk. This amount is first written,
@@ -194,7 +192,6 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
   }
 
-
   public class BatchHolder {
 
     private VectorContainer aggrValuesContainer; // container for aggr values (workspace variables)
@@ -401,7 +398,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     } else { // two phase
       // Adjust down the number of partitions if needed - when the memory available can not hold as
       // many batches (configurable option), plus overhead (e.g. hash table, links, hash values))
-      while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) {
+      while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 2 * 1024 * 1024) > memAvail ) {
         numPartitions /= 2;
         if ( numPartitions < 2) {
           if ( is2ndPhase ) {
@@ -562,21 +559,22 @@ public abstract class HashAggTemplate implements HashAggregator {
       // from one of the spill files (The spill case is handled differently here to avoid
       // collecting stats on the spilled records)
       //
+      long memAllocBeforeNext = allocator.getAllocatedMemory();
+
       if ( handlingSpills ) {
         outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP;
       } else {
-        long beforeAlloc = allocator.getAllocatedMemory();
-
         // Get the next RecordBatch from the incoming (i.e. upstream operator)
         outcome = outgoing.next(0, incoming);
-
-        // If incoming batch is bigger than our estimate - adjust the estimate to match
-        long afterAlloc = allocator.getAllocatedMemory();
-        long incomingBatchSize = afterAlloc - beforeAlloc;
-        if ( estMaxBatchSize < incomingBatchSize) {
-          logger.trace("Found a bigger incoming batch: {} , prior estimate was: {}", incomingBatchSize, estMaxBatchSize);
-          estMaxBatchSize = incomingBatchSize;
-        }
+      }
+      long memAllocAfterNext = allocator.getAllocatedMemory();
+      long incomingBatchSize = memAllocAfterNext - memAllocBeforeNext;
+
+      // If incoming batch is bigger than our estimate - adjust the estimate to match
+      if ( estMaxBatchSize < incomingBatchSize) {
+        logger.debug("Found a bigger next {} batch: {} , prior estimate was: {}, mem allocated {}",handlingSpills ? "spill" : "incoming",
+            incomingBatchSize, estMaxBatchSize, memAllocAfterNext);
+        estMaxBatchSize = incomingBatchSize;
       }
 
       if (EXTRA_DEBUG_1) {
@@ -843,7 +841,7 @@ public abstract class HashAggTemplate implements HashAggregator {
       currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
       int numOutputRecords = outNumRecordsHolder.value;
 
-      this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
+      this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value, numPendingOutput);
 
       // set the value count for outgoing batch value vectors
       /* int i = 0; */
@@ -967,7 +965,7 @@ public abstract class HashAggTemplate implements HashAggregator {
         if ( spilledPartitionsList.isEmpty() ) { // and no spilled partitions
           allFlushed = true;
           this.outcome = IterOutcome.NONE;
-          if ( is2ndPhase ) {
+          if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
             stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
                 (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
           }
@@ -1025,7 +1023,7 @@ public abstract class HashAggTemplate implements HashAggregator {
       logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value, outNumRecordsHolder.value);
     }
 
-    this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
+    this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value, numPendingOutput);
 
     // set the value count for outgoing batch value vectors
     for (VectorWrapper<?> v : outgoing) {
@@ -1096,15 +1094,16 @@ public abstract class HashAggTemplate implements HashAggregator {
   /**
    *  Generate a detailed error message in case of "Out Of Memory"
    * @return err msg
+   * @param prefix
    */
-  private String getOOMErrorMsg() {
+  private String getOOMErrorMsg(String prefix) {
     String errmsg;
     if ( !isTwoPhase ) {
       errmsg = "Single Phase Hash Aggregate operator can not spill." ;
     } else if ( ! canSpill ) {  // 2nd phase, with only 1 partition
       errmsg = "Too little memory available to operator to facilitate spilling.";
     } else { // a bug ?
-      errmsg = "OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + numPartitions +
+      errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + numPartitions +
       ". Estimated batch size: " + estMaxBatchSize + ". Planned batches: " + plannedBatches;
       if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; }
     }
@@ -1165,46 +1164,64 @@ public abstract class HashAggTemplate implements HashAggregator {
     int currentPartition = hashCode & partitionMask ;
     hashCode >>>= bitsInMask;
     HashTable.PutStatus putStatus = null;
-    long allocatedBefore = allocator.getAllocatedMemory();
+    long allocatedBeforeHTput = allocator.getAllocatedMemory();
 
+    // ==========================================
     // Insert the key columns into the hash table
+    // ==========================================
     try {
       putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
     } catch (OutOfMemoryException exc) {
-      throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill
+      throw new OutOfMemoryException(getOOMErrorMsg("HT was: " + allocatedBeforeHTput), exc); // may happen when can not spill
     } catch (SchemaChangeException e) {
       throw new UnsupportedOperationException("Unexpected schema change", e);
     }
-    int currentIdx = htIdxHolder.value;
 
-    long addedMem = allocator.getAllocatedMemory() - allocatedBefore;
-    if ( addedMem > 0 ) {
-      logger.trace("MEMORY CHECK HT: allocated {}  added {} partition {}",allocatedBefore,addedMem,currentPartition);
-    }
+    boolean needToCheckIfSpillIsNeeded = false;
+    long allocatedBeforeAggCol = allocator.getAllocatedMemory();
 
-    // Check if put() added a new batch (for the keys) inside the hash table, hence a matching batch
-    // (for the aggregate columns) needs to be created
+    // Add an Aggr batch if needed:
+    //
+    //       In case put() added a new batch (for the keys) inside the hash table,
+    //       then a matching batch (for the aggregate columns) needs to be created
+    //
     if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) {
       try {
-        long allocatedBeforeAggCol = allocator.getAllocatedMemory();
 
         addBatchHolder(currentPartition);
 
         if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch
-        long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore;
-        logger.trace("MEMORY CHECK AGG: added {}  total (with HT) added {}",allocator.getAllocatedMemory()-allocatedBeforeAggCol,totalAddedMem);
+        long totalAddedMem = allocator.getAllocatedMemory() - allocatedBeforeHTput;
+        logger.trace("MEMORY CHECK AGG: allocated now {}, added {}  total (with HT) added {}", allocator.getAllocatedMemory(),
+            allocator.getAllocatedMemory() - allocatedBeforeAggCol, totalAddedMem);
         // resize the batch estimate if needed (e.g., varchars may take more memory than estimated)
-        if ( totalAddedMem > estMaxBatchSize ) {
-          logger.trace("Adjusting Batch size estimate from {} to {}",estMaxBatchSize,totalAddedMem);
+        if (totalAddedMem > estMaxBatchSize) {
+          logger.trace("Adjusting Batch size estimate from {} to {}", estMaxBatchSize, totalAddedMem);
           estMaxBatchSize = totalAddedMem;
+          needToCheckIfSpillIsNeeded = true; // better check the memory limits again now
         }
       } catch (OutOfMemoryException exc) {
-        throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill
+        throw new OutOfMemoryException(getOOMErrorMsg("AGGR"), exc); // may happen when can not spill
       }
+    } else if ( putStatus == HashTable.PutStatus.KEY_ADDED_LAST ) {
+        // If a batch just became full (i.e. another batch would be allocated soon) -- then need to
+        // check (later, see below) if the memory limits are too close, and if so -- then spill !
+        plannedBatches++; // planning to allocate one more batch
+        needToCheckIfSpillIsNeeded = true;
+    } else if ( allocatedBeforeAggCol > allocatedBeforeHTput ) {
+        // if HT put() allocated memory (other than a new batch; e.g. HT doubling, or buffer resizing)
+        // then better check again whether a spill is needed
+        needToCheckIfSpillIsNeeded = true;
+
+        logger.trace("MEMORY CHECK HT: was allocated {}  added {} partition {}",allocatedBeforeHTput, allocatedBeforeAggCol - allocatedBeforeHTput,currentPartition);
     }
+
+    // =================================================================
+    // Locate the matching aggregate columns and perform the aggregation
+    // =================================================================
+    int currentIdx = htIdxHolder.value;
     BatchHolder bh = batchHolders[currentPartition].get((currentIdx >>> 16) & HashTable.BATCH_MASK);
     int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
-
     if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
       numGroupedRecords++;
     }
@@ -1214,24 +1231,20 @@ public abstract class HashAggTemplate implements HashAggregator {
     // If exceeded, then need to spill (if 2nd phase) or output early (1st)
     // (Skip this if cannot spill; in such case an OOM may be encountered later)
     // ===================================================================================
-    if ( putStatus == HashTable.PutStatus.KEY_ADDED_LAST && canSpill ) {
-
-      plannedBatches++; // planning to allocate one more batch
+    if ( needToCheckIfSpillIsNeeded && canSpill ) {
 
       // calculate the (max) new memory needed now
-      long hashTableDoublingSizeNeeded = 0; // in case the hash table(s) would resize
-      for ( HashTable ht : htables ) {
-        hashTableDoublingSizeNeeded += ht.extraMemoryNeededForResize();
-      }
-
-      // Plan ahead for at least MIN batches, to account for size changing, and some overhead
-      long maxMemoryNeeded = minBatchesPerPartition * plannedBatches *
-          ( estMaxBatchSize + MAX_BATCH_SIZE * ( 4 + 4 /* links + hash-values */) ) +
-          hashTableDoublingSizeNeeded;
+      // Plan ahead for at least MIN batches
+      long maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) *
+          ( estMaxBatchSize + MAX_BATCH_SIZE * ( 4 + 4 /* links + hash-values */) );
+      // Add the (max) size of the current hash table, in case it will double
+      int maxSize = 1;
+      for ( int insp = 0; insp < numPartitions; insp++) { maxSize = Math.max(maxSize, batchHolders[insp].size()); }
+      maxMemoryNeeded += MAX_BATCH_SIZE * 2 * 2 * 4 * maxSize; // 2 - double, 2 - max when %50 full, 4 - Uint4
 
       // log a detailed debug message explaining why a spill may be needed
-      logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " +
-          "Memory needed {}, Est batch size {}, mem limit {}",
+      logger.debug("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " +
+          "Max memory needed {}, Est batch size {}, mem limit {}",
           allocator.getAllocatedMemory(), isTwoPhase?(is2ndPhase?"2ND":"1ST"):"Single", currentPartition,
           batchHolders[currentPartition].size(), maxMemoryNeeded, estMaxBatchSize, memoryLimit);
       //
@@ -1301,8 +1314,9 @@ public abstract class HashAggTemplate implements HashAggregator {
     this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
     this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
     this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
-    this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
+    this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
     this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
+    this.stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // Put 0 in case no spill
     if ( is2ndPhase ) {
       this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/35a7ad00/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 9c93c16..1e6570f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -64,8 +64,6 @@ public interface HashTable {
 
   public void getStats(HashTableStats stats);
 
-  public long extraMemoryNeededForResize();
-
   public int size();
 
   public boolean isEmpty();
@@ -78,9 +76,7 @@ public interface HashTable {
 
   public void setMaxVarcharSize(int size);
 
-  public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords);
-
-  // public void addNewKeyBatch();
+  public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords);
 }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/35a7ad00/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 3209c27..336026c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -326,7 +326,7 @@ public abstract class HashTableTemplate implements HashTable {
       hashValues = newHashValues;
     }
 
-    private boolean outputKeys(VectorContainer outContainer, int outStartIndex, int numRecords) {
+    private boolean outputKeys(VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords) {
 
       /** for debugging
       BigIntVector vv0 = getValueVector(0);
@@ -344,7 +344,16 @@ public abstract class HashTableTemplate implements HashTable {
         @SuppressWarnings("resource")
         ValueVector targetVV = outgoingIter.next().getValueVector();
         TransferPair tp = sourceVV.makeTransferPair(targetVV);
-        tp.splitAndTransfer(outStartIndex, numRecords);
+        if ( outStartIndex == 0 && numRecords == numExpectedRecords ) {
+          // The normal case: The whole column key(s) are transfered as is
+          tp.transfer();
+        } else {
+          // Transfer just the required section (does this ever happen ?)
+          // Requires an expensive allocation and copy
+          logger.debug("Performing partial output of keys, from index {}, num {} (out of {})",
+              outStartIndex,numRecords,numExpectedRecords);
+          tp.splitAndTransfer(outStartIndex, numRecords);
+        }
       }
 
 /*
@@ -510,21 +519,6 @@ public abstract class HashTableTemplate implements HashTable {
     return numResizing;
   }
 
-  /**
-   *
-   * @return Size of extra memory needed if the HT (i.e. startIndices) is doubled
-   */
-  @Override
-  public long extraMemoryNeededForResize() {
-    if (tableSize == MAXIMUM_CAPACITY) { return 0; } // will not resize
-    int newSize = roundUpToPowerOf2(2 * tableSize);
-
-    if (newSize > MAXIMUM_CAPACITY) {
-      newSize  = MAXIMUM_CAPACITY;
-    }
-    return newSize * 4 /* sizeof(int) */;
-  }
-
   @Override
   public int size() {
     return numEntries;
@@ -774,12 +768,9 @@ public abstract class HashTableTemplate implements HashTable {
   }
 
   @Override
-  public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords) {
+  public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords) {
     assert batchIdx < batchHolders.size();
-    if (!batchHolders.get(batchIdx).outputKeys(outContainer, outStartIndex, numRecords)) {
-      return false;
-    }
-    return true;
+    return batchHolders.get(batchIdx).outputKeys(outContainer, outStartIndex, numRecords, numExpectedRecords);
   }
 
   private IntVector allocMetadataVector(int size, int initialValue) {
@@ -795,13 +786,6 @@ public abstract class HashTableTemplate implements HashTable {
   @Override
   public void setMaxVarcharSize(int size) { maxVarcharSize = size; }
 
-/*  @Override
-  public void addNewKeyBatch() {
-    int numberOfBatches = batchHolders.size();
-    this.addBatchHolder();
-    freeIndex = numberOfBatches * BATCH_SIZE;
-  }
-*/
   // These methods will be code-generated in the context of the outer class
   protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws SchemaChangeException;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/35a7ad00/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 4af1664..72b8833 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -155,7 +155,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     NUM_BUCKETS,
     NUM_ENTRIES,
     NUM_RESIZING,
-    RESIZING_TIME;
+    RESIZING_TIME_MS;
 
     // duplicate for hash ag
 
@@ -526,7 +526,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
     stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
     stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
-    stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
+    stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
   }
 
   @Override


[5/5] drill git commit: DRILL-4720: Fix SchemaPartitionExplorer.getSubPartitions method implementations to return only Drill file system directories

Posted by ar...@apache.org.
DRILL-4720: Fix SchemaPartitionExplorer.getSubPartitions method implementations to return only Drill file system directories

1. Added file system util helper classes to standardize list directory and file statuses usage in Drill with appropriate unit tests.
2. Fixed SchemaPartitionExplorer.getSubPartitions method implementations to return only directories that can be partitions according to Drill file system rules
(excluded all files and directories that start with dot or underscore).
3. Added unit test for directory explorers UDFs with and without metadata cache presence.
4. Minor refactoring.

closes #864


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a0c178ba
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a0c178ba
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a0c178ba

Branch: refs/heads/master
Commit: a0c178babb6d82a30af8fdf5e912cbc9c9526a85
Parents: 368bc38
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Thu Jun 29 16:08:33 2017 +0300
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri Jul 21 19:38:35 2017 +0300

----------------------------------------------------------------------
 .../planner/sql/handlers/ShowFileHandler.java   |  13 +-
 .../drill/exec/store/dfs/DrillFileSystem.java   |  36 +--
 .../drill/exec/store/dfs/DrillPathFilter.java   |  34 ---
 .../drill/exec/store/dfs/FileSelection.java     |  28 +--
 .../exec/store/dfs/FileSystemSchemaFactory.java |   3 +-
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |  17 +-
 .../exec/store/parquet/FooterGatherer.java      |  12 +-
 .../drill/exec/store/parquet/Metadata.java      |  36 +--
 .../exec/store/parquet/ParquetFormatPlugin.java |  14 +-
 .../exec/store/parquet/ParquetGroupScan.java    |  16 +-
 .../store/sys/store/LocalPersistentStore.java   |  21 +-
 .../drill/exec/util/DrillFileSystemUtil.java    |  91 +++++++
 .../apache/drill/exec/util/FileSystemUtil.java  | 207 ++++++++++++++++
 .../exec/planner/TestDirectoryExplorerUDFs.java |  70 +++++-
 .../exec/util/DrillFileSystemUtilTest.java      | 158 ++++++++++++
 .../drill/exec/util/FileSystemUtilTest.java     | 240 +++++++++++++++++++
 .../drill/exec/util/FileSystemUtilTestBase.java | 112 +++++++++
 17 files changed, 942 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
index fb564a2..5e6af7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -33,6 +33,7 @@ import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.parser.SqlShowFiles;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.util.FileSystemUtil;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -50,8 +51,8 @@ public class ShowFileHandler extends DefaultSqlHandler {
 
     SqlIdentifier from = ((SqlShowFiles) sqlNode).getDb();
 
-    DrillFileSystem fs = null;
-    String defaultLocation = null;
+    DrillFileSystem fs;
+    String defaultLocation;
     String fromDir = "./";
 
     SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
@@ -93,9 +94,9 @@ public class ShowFileHandler extends DefaultSqlHandler {
 
     List<ShowFilesCommandResult> rows = new ArrayList<>();
 
-    for (FileStatus fileStatus : fs.list(false, new Path(defaultLocation, fromDir))) {
-      ShowFilesCommandResult result = new ShowFilesCommandResult(fileStatus.getPath().getName(), fileStatus.isDir(),
-                                                                 !fileStatus.isDir(), fileStatus.getLen(),
+    for (FileStatus fileStatus : FileSystemUtil.listAll(fs, new Path(defaultLocation, fromDir), false)) {
+      ShowFilesCommandResult result = new ShowFilesCommandResult(fileStatus.getPath().getName(), fileStatus.isDirectory(),
+                                                                 fileStatus.isFile(), fileStatus.getLen(),
                                                                  fileStatus.getOwner(), fileStatus.getGroup(),
                                                                  fileStatus.getPermission().toString(),
                                                                  fileStatus.getAccessTime(), fileStatus.getModificationTime());

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index e03cf22..52e1a96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -62,7 +62,6 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -75,8 +74,8 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
   private final static boolean TRACKING_ENABLED = AssertionUtil.isAssertionsEnabled();
 
-  public static final String HIDDEN_FILE_PREFIX = "_";
-  public static final String DOT_FILE_PREFIX = ".";
+  public static final String UNDERSCORE_PREFIX = "_";
+  public static final String DOT_PREFIX = ".";
 
   private final ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
 
@@ -747,35 +746,6 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
     underlyingFs.removeXAttr(path, name);
   }
 
-  public List<FileStatus> list(boolean recursive, Path... paths) throws IOException {
-    if (recursive) {
-      List<FileStatus> statuses = Lists.newArrayList();
-      for (Path p : paths) {
-        addRecursiveStatus(underlyingFs.getFileStatus(p), statuses);
-      }
-      return statuses;
-
-    } else {
-      return Lists.newArrayList(underlyingFs.listStatus(paths));
-    }
-  }
-
-  private void addRecursiveStatus(FileStatus parent, List<FileStatus> listToFill) throws IOException {
-    if (parent.isDir()) {
-      Path pattern = new Path(parent.getPath(), "*");
-      FileStatus[] sub = underlyingFs.globStatus(pattern, new DrillPathFilter());
-      for(FileStatus s : sub){
-        if (s.isDir()) {
-          addRecursiveStatus(s, listToFill);
-        } else {
-          listToFill.add(s);
-        }
-      }
-    } else {
-      listToFill.add(parent);
-    }
-  }
-
   public InputStream openPossiblyCompressedStream(Path path) throws IOException {
     CompressionCodec codec = codecFactory.getCodec(path); // infers from file ext.
     if (codec != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
deleted file mode 100644
index 00f463d..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.dfs;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Utils;
-
-public class DrillPathFilter extends Utils.OutputFileUtils.OutputFilesFilter {
-  @Override
-  public boolean accept(Path path) {
-    if (path.getName().startsWith(DrillFileSystem.HIDDEN_FILE_PREFIX)) {
-      return false;
-    }
-    if (path.getName().startsWith(DrillFileSystem.DOT_FILE_PREFIX)) {
-      return false;
-    }
-    return super.accept(path);
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 3a89591..7682d69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -20,20 +20,15 @@ package org.apache.drill.exec.store.dfs;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import javax.annotation.Nullable;
-
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Strings;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
@@ -166,23 +161,16 @@ public class FileSelection {
       return this;
     }
     Stopwatch timer = Stopwatch.createStarted();
-    final List<FileStatus> statuses = getStatuses(fs);
-    final int total = statuses.size();
-    final Path[] paths = new Path[total];
-    for (int i=0; i<total; i++) {
-      paths[i] = statuses.get(i).getPath();
+    List<FileStatus> statuses = getStatuses(fs);
+
+    List<FileStatus> nonDirectories = Lists.newArrayList();
+    for (FileStatus status : statuses) {
+      nonDirectories.addAll(DrillFileSystemUtil.listFiles(fs, status.getPath(), true));
     }
-    final List<FileStatus> allStats = fs.list(true, paths);
-    final List<FileStatus> nonDirectories = Lists.newArrayList(Iterables.filter(allStats, new Predicate<FileStatus>() {
-      @Override
-      public boolean apply(@Nullable FileStatus status) {
-        return !status.isDirectory();
-      }
-    }));
 
     final FileSelection fileSel = create(nonDirectories, null, selectionRoot);
     logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: {}",
-        timer.elapsed(TimeUnit.MILLISECONDS), total);
+        timer.elapsed(TimeUnit.MILLISECONDS), statuses.size());
 
     // fileSel will be null if we query an empty folder
     if (fileSel != null) {
@@ -425,7 +413,7 @@ public class FileSelection {
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("root=" + this.selectionRoot);
+    sb.append("root=").append(this.selectionRoot);
 
     sb.append("files=[");
     boolean isFirst = true;

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index e3e01c4..cf30162 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
@@ -95,7 +96,7 @@ public class FileSystemSchemaFactory implements SchemaFactory{
                                             ) throws PartitionNotFoundException {
       List<FileStatus> fileStatuses;
       try {
-        fileStatuses = defaultSchema.getFS().list(false, new Path(defaultSchema.getDefaultLocation(), table));
+        fileStatuses = DrillFileSystemUtil.listDirectories(defaultSchema.getFS(), new Path(defaultSchema.getDefaultLocation(), table), false);
       } catch (IOException e) {
         throw new PartitionNotFoundException("Error finding partitions for table " + table, e);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 8416ed8..b2798a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -64,6 +64,7 @@ import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -404,7 +405,7 @@ public class WorkspaceSchemaFactory {
 
       List<FileStatus> fileStatuses;
       try {
-        fileStatuses = getFS().list(false, new Path(getDefaultLocation(), table));
+        fileStatuses = DrillFileSystemUtil.listDirectories(getFS(), new Path(getDefaultLocation(), table), false);
       } catch (IOException e) {
         throw new PartitionNotFoundException("Error finding partitions for table " + table, e);
       }
@@ -639,12 +640,12 @@ public class WorkspaceSchemaFactory {
     }
 
     /**
-     * Check if the table contains homogenenous files that can be read by Drill. Eg: parquet, json csv etc.
+     * Check if the table contains homogeneous files that can be read by Drill. Eg: parquet, json csv etc.
      * However if it contains more than one of these formats or a totally different file format that Drill cannot
      * understand then we will raise an exception.
-     * @param tableName - name of the table to be checked for homogeneous property
-     * @return
-     * @throws IOException
+     * @param tableName name of the table to be checked for homogeneous property
+     * @return true if table contains homogeneous files, false otherwise
+     * @throws IOException is case of problems accessing table files
      */
     private boolean isHomogeneous(String tableName) throws IOException {
       FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), tableName);
@@ -663,7 +664,7 @@ public class WorkspaceSchemaFactory {
       while (!listOfFiles.isEmpty()) {
         FileStatus currentFile = listOfFiles.poll();
         if (currentFile.isDirectory()) {
-          listOfFiles.addAll(fs.list(true, currentFile.getPath()));
+          listOfFiles.addAll(DrillFileSystemUtil.listFiles(fs, currentFile.getPath(), true));
         } else {
           if (matcher != null) {
             if (!matcher.isFileReadable(fs, currentFile)) {
@@ -709,7 +710,7 @@ public class WorkspaceSchemaFactory {
         long time =  (System.currentTimeMillis()/1000);
         Long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
         Long p2 = r.nextLong();
-        final String fileNameDelimiter = DrillFileSystem.HIDDEN_FILE_PREFIX;
+        final String fileNameDelimiter = DrillFileSystem.UNDERSCORE_PREFIX;
         String[] pathSplit = table.split(Path.SEPARATOR);
         /*
          * Builds the string for the renamed table
@@ -718,7 +719,7 @@ public class WorkspaceSchemaFactory {
          * separated by underscores
          */
         tableRenameBuilder
-            .append(DrillFileSystem.HIDDEN_FILE_PREFIX)
+            .append(DrillFileSystem.UNDERSCORE_PREFIX)
             .append(pathSplit[pathSplit.length - 1])
             .append(fileNameDelimiter)
             .append(p1.toString())

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index b68ffbb..3ba6ff0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.exec.store.TimedRunnable;
-import org.apache.drill.exec.store.dfs.DrillPathFilter;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -68,10 +68,10 @@ public class FooterGatherer {
   public static List<Footer> getFooters(final Configuration conf, List<FileStatus> statuses, int parallelism) throws IOException {
     final List<TimedRunnable<Footer>> readers = Lists.newArrayList();
     List<Footer> foundFooters = Lists.newArrayList();
-    for(FileStatus status : statuses){
+    for (FileStatus status : statuses) {
 
 
-      if(status.isDirectory()){
+      if (status.isDirectory()){
         // first we check for summary file.
         FileSystem fs = status.getPath().getFileSystem(conf);
 
@@ -83,10 +83,10 @@ public class FooterGatherer {
         }
 
         // else we handle as normal file.
-        for(FileStatus inStatus : fs.listStatus(status.getPath(), new DrillPathFilter())){
+        for (FileStatus inStatus : DrillFileSystemUtil.listFiles(fs, status.getPath(), false)){
           readers.add(new FooterReader(conf, inStatus));
         }
-      }else{
+      } else {
         readers.add(new FooterReader(conf, status));
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 0a4ce60..d9b99f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -31,7 +31,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.TimedRunnable;
-import org.apache.drill.exec.store.dfs.DrillPathFilter;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.BlockLocation;
@@ -179,7 +179,7 @@ public class Metadata {
 
     final List<FileStatus> childFiles = Lists.newArrayList();
 
-    for (final FileStatus file : fs.listStatus(p, new DrillPathFilter())) {
+    for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) {
       if (file.isDirectory()) {
         ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath().toString())).getLeft();
         metaDataList.addAll(subTableMetadata.files);
@@ -233,17 +233,22 @@ public class Metadata {
   }
 
   /**
-   * Get the parquet metadata for the parquet files in a directory
+   * Get the parquet metadata for the parquet files in a directory.
    *
    * @param path the path of the directory
-   * @return
-   * @throws IOException
+   * @return metadata object for an entire parquet directory structure
+   * @throws IOException in case of problems during accessing files
    */
   private ParquetTableMetadata_v3 getParquetTableMetadata(String path) throws IOException {
     Path p = new Path(path);
     FileStatus fileStatus = fs.getFileStatus(p);
     final Stopwatch watch = Stopwatch.createStarted();
-    List<FileStatus> fileStatuses = getFileStatuses(fileStatus);
+    List<FileStatus> fileStatuses = new ArrayList<>();
+    if (fileStatus.isFile()) {
+      fileStatuses.add(fileStatus);
+    } else {
+      fileStatuses.addAll(DrillFileSystemUtil.listFiles(fs, p, true));
+    }
     logger.info("Took {} ms to get file statuses", watch.elapsed(TimeUnit.MILLISECONDS));
     watch.reset();
     watch.start();
@@ -290,25 +295,6 @@ public class Metadata {
   }
 
   /**
-   * Recursively get a list of files
-   *
-   * @param fileStatus
-   * @return
-   * @throws IOException
-   */
-  private List<FileStatus> getFileStatuses(FileStatus fileStatus) throws IOException {
-    List<FileStatus> statuses = Lists.newArrayList();
-    if (fileStatus.isDirectory()) {
-      for (FileStatus child : fs.listStatus(fileStatus.getPath(), new DrillPathFilter())) {
-        statuses.addAll(getFileStatuses(child));
-      }
-    } else {
-      statuses.add(fileStatus);
-    }
-    return statuses;
-  }
-
-  /**
    * TimedRunnable that reads the footer from parquet and collects file metadata
    */
   private class MetadataGatherer extends TimedRunnable<ParquetFileMetadata_v3> {

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 0eb4665..3f331b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -41,10 +41,10 @@ import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.DrillPathFilter;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.dfs.FormatSelection;
@@ -56,7 +56,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 
@@ -260,13 +259,8 @@ public class ParquetFormatPlugin implements FormatPlugin{
           if (metaDataFileExists(fs, dir)) {
             return true;
           }
-          PathFilter filter = new DrillPathFilter();
-
-          FileStatus[] files = fs.listStatus(dir.getPath(), filter);
-          if (files.length == 0) {
-            return false;
-          }
-          return super.isFileReadable(fs, files[0]);
+          List<FileStatus> statuses = DrillFileSystemUtil.listFiles(fs, dir.getPath(), false);
+          return !statuses.isEmpty() && super.isFileReadable(fs, statuses.get(0));
         }
       } catch (IOException e) {
         logger.info("Failure while attempting to check for Parquet metadata file.", e);

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 3d9cfb3..30f607d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -57,8 +57,8 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ImplicitColumnExplorer;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.DrillPathFilter;
 import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.store.dfs.MetadataContext.PruneStatus;
 import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
@@ -743,7 +743,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
       } else {
         final List<FileStatus> fileStatuses = Lists.newArrayList();
         for (ReadEntryWithPath entry : entries) {
-          getFiles(entry.getPath(), fileStatuses);
+          fileStatuses.addAll(DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(new Path(entry.getPath())), true));
         }
         parquetTableMetadata = Metadata.getParquetTableMetadata(fs, fileStatuses, formatConfig);
       }
@@ -857,18 +857,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
     return this.endpointAffinities;
   }
 
-  private void getFiles(String path, List<FileStatus> fileStatuses) throws IOException {
-    Path p = Path.getPathWithoutSchemeAndAuthority(new Path(path));
-    FileStatus fileStatus = fs.getFileStatus(p);
-    if (fileStatus.isDirectory()) {
-      for (FileStatus f : fs.listStatus(p, new DrillPathFilter())) {
-        getFiles(f.getPath().toString(), fileStatuses);
-      }
-    } else {
-      fileStatuses.add(fileStatus);
-    }
-  }
-
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) throws PhysicalOperatorSetupException {
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
index dc4c414..320a864 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
@@ -39,6 +39,7 @@ import org.apache.drill.common.concurrent.AutoCloseableLock;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.exception.VersionMismatchException;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.sys.BasePersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.PersistentStoreMode;
@@ -51,6 +52,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.PathFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,17 +116,22 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> {
   public Iterator<Map.Entry<String, V>> getRange(int skip, int take) {
     try (AutoCloseableLock lock = readLock.open()) {
       try {
-        List<FileStatus> f = fs.list(false, basePath);
-        if (f == null || f.isEmpty()) {
+        // list only files with sys file suffix
+        PathFilter sysFileSuffixFilter = new PathFilter() {
+          @Override
+          public boolean accept(Path path) {
+            return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX);
+          }
+        };
+        List<FileStatus> fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter);
+        if (fileStatuses.isEmpty()) {
           return Collections.emptyIterator();
         }
-        List<String> files = Lists.newArrayList();
 
-        for (FileStatus stat : f) {
+        List<String> files = Lists.newArrayList();
+        for (FileStatus stat : fileStatuses) {
           String s = stat.getPath().getName();
-          if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
-            files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
-          }
+          files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
         }
 
         Collections.sort(files);

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
new file mode 100644
index 0000000..56d9385
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * In Drill file system all directories and files that start with dot or underscore is ignored.
+ * This helper class that delegates all work to list directory and file statuses to {@link org.apache.drill.exec.util.FileSystemUtil} class,
+ * only adding Drill file system filter first.
+ */
+public class DrillFileSystemUtil {
+
+  /**
+   * Path filter that skips all files and folders that start with dot or underscore.
+   */
+  public static final PathFilter DRILL_SYSTEM_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return !path.getName().startsWith(DrillFileSystem.UNDERSCORE_PREFIX) && !path.getName().startsWith(DrillFileSystem.DOT_PREFIX);
+    }
+  };
+
+  /**
+   * Returns statuses of all directories present in given path applying custom filters if present.
+   * Directories that start with dot or underscore are skipped.
+   * Will also include nested directories if recursive flag is set to true.
+   *
+   * @param fs current file system
+   * @param path path to directory
+   * @param recursive true if nested directories should be included
+   * @param filters list of custom filters (optional)
+   * @return list of matching directory statuses
+   */
+  public static List<FileStatus> listDirectories(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    return FileSystemUtil.listDirectories(fs, path, recursive, FileSystemUtil.mergeFilters(DRILL_SYSTEM_FILTER, filters));
+  }
+
+  /**
+   * Returns statuses of all files present in given path applying custom filters if present.
+   * Files and nested directories that start with dot or underscore are skipped.
+   * Will also include files from nested directories if recursive flag is set to true.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if files in nested directories should be included
+   * @param filters list of custom filters (optional)
+   * @return list of matching file statuses
+   */
+  public static List<FileStatus> listFiles(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    return FileSystemUtil.listFiles(fs, path, recursive, FileSystemUtil.mergeFilters(DRILL_SYSTEM_FILTER, filters));
+  }
+
+  /**
+   * Returns statuses of all directories and files present in given path applying custom filters if present.
+   * Directories and files that start with dot or underscore are skipped.
+   * Will also include nested directories and their files if recursive flag is set to true.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if nested directories and their files should be included
+   * @param filters list of custom filters (optional)
+   * @return list of matching directory and file statuses
+   */
+  public static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    return FileSystemUtil.listAll(fs, path, recursive, FileSystemUtil.mergeFilters(DRILL_SYSTEM_FILTER, filters));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
new file mode 100644
index 0000000..84b22b6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Helper class that provides methods to list directories or file or both statuses.
+ * Can list statuses recursive and apply custom filters.
+ */
+public class FileSystemUtil {
+
+  /**
+   * Filter that will accept all files and directories.
+   */
+  public static final PathFilter DUMMY_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return true;
+    }
+  };
+
+  /**
+   * Returns statuses of all directories present in given path applying custom filters if present.
+   * Will also include nested directories if recursive flag is set to true.
+   *
+   * @param fs current file system
+   * @param path path to directory
+   * @param recursive true if nested directories should be included
+   * @param filters list of custom filters (optional)
+   * @return list of matching directory statuses
+   */
+  public static List<FileStatus> listDirectories(final FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    List<FileStatus> statuses = new ArrayList<>();
+    listDirectories(fs, path, recursive, statuses, mergeFilters(filters));
+    return statuses;
+  }
+
+  /**
+   * Returns statuses of all files present in given path applying custom filters if present.
+   * Will also include files from nested directories if recursive flag is set to true.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if files in nested directories should be included
+   * @param filters list of custom filters (optional)
+   * @return list of matching file statuses
+   */
+  public static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    List<FileStatus> statuses = new ArrayList<>();
+    listFiles(fs, path, recursive, statuses, mergeFilters(filters));
+    return statuses;
+  }
+
+  /**
+   * Returns statuses of all directories and files present in given path applying custom filters if present.
+   * Will also include nested directories and their files if recursive flag is set to true.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if nested directories and their files should be included
+   * @param filters list of custom filters (optional)
+   * @return list of matching directory and file statuses
+   */
+  public static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, PathFilter... filters) throws IOException {
+    List<FileStatus> statuses = new ArrayList<>();
+    listAll(fs, path, recursive, statuses, mergeFilters(filters));
+    return statuses;
+  }
+
+  /**
+   * Merges given filter with array of filters.
+   * If array of filters is null or empty, will return given filter.
+   *
+   * @param filter given filter
+   * @param filters array of filters
+   * @return one filter that combines all given filters
+   */
+  public static PathFilter mergeFilters(PathFilter filter, PathFilter[] filters) {
+    if (filters == null || filters.length == 0) {
+      return filter;
+    }
+
+    int length = filters.length;
+    PathFilter[] newFilters = Arrays.copyOf(filters, length + 1);
+    newFilters[length] = filter;
+    return mergeFilters(newFilters);
+  }
+
+  /**
+   * Will merge given array of filters into one.
+   * If given array of filters is empty, will return {@link #DUMMY_FILTER}.
+   *
+   * @param filters array of filters
+   * @return one filter that combines all given filters
+   */
+  public static PathFilter mergeFilters(final PathFilter... filters) {
+    if (filters.length == 0) {
+      return DUMMY_FILTER;
+    }
+
+    return new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        for (PathFilter filter : filters) {
+          if (!filter.accept(path)) {
+            return false;
+          }
+        }
+        return true;
+      }
+    };
+  }
+
+  /**
+   * Helper method that will store in given holder statuses of all directories present in given path applying custom filter.
+   * If recursive flag is set to true, will call itself recursively to add statuses of nested directories.
+   *
+   * @param fs current file system
+   * @param path path to directory
+   * @param recursive true if nested directories should be included
+   * @param statuses holder for directory statuses
+   * @param filter custom filter
+   * @return holder with all matching directory statuses
+   */
+  private static List<FileStatus> listDirectories(FileSystem fs, Path path, boolean recursive, List<FileStatus> statuses, PathFilter filter) throws IOException {
+    FileStatus[] fileStatuses = fs.listStatus(path, filter);
+    for (FileStatus status: fileStatuses) {
+      if (status.isDirectory()) {
+        statuses.add(status);
+        if (recursive) {
+          listDirectories(fs, status.getPath(), true, statuses, filter);
+        }
+      }
+    }
+    return statuses;
+  }
+
+  /**
+   * Helper method that will store in given holder statuses of all files present in given path applying custom filter.
+   * If recursive flag is set to true, will call itself recursively to add file statuses from nested directories.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if files in nested directories should be included
+   * @param statuses holder for file statuses
+   * @param filter custom filter
+   * @return holder with all matching file statuses
+   */
+  private static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recursive, List<FileStatus> statuses, PathFilter filter) throws IOException {
+    FileStatus[] fileStatuses = fs.listStatus(path, filter);
+    for (FileStatus status: fileStatuses) {
+      if (status.isDirectory()) {
+        if (recursive) {
+          listFiles(fs, status.getPath(), true, statuses, filter);
+        }
+      } else {
+        statuses.add(status);
+      }
+    }
+    return statuses;
+  }
+
+  /**
+   * Helper method that will store in given holder statuses of all directories and files present in given path applying custom filter.
+   * If recursive flag is set to true, will call itself recursively to add nested directories and their file statuses.
+   *
+   * @param fs current file system
+   * @param path path to file or directory
+   * @param recursive true if nested directories and their files should be included
+   * @param statuses holder for directory and file statuses
+   * @param filter custom filter
+   * @return holder with all matching directory and file statuses
+   */
+  private static List<FileStatus> listAll(FileSystem fs, Path path, boolean recursive, List<FileStatus> statuses, PathFilter filter) throws IOException {
+    for (FileStatus status: fs.listStatus(path, filter)) {
+      statuses.add(status);
+      if (status.isDirectory() && recursive) {
+        listAll(fs, status.getPath(), true, statuses, filter);
+      }
+    }
+    return statuses;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
index a5916a5..4458d58 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/TestDirectoryExplorerUDFs.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -14,20 +14,26 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- ******************************************************************************/
+*/
 package org.apache.drill.exec.planner;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.fn.interp.TestConstantFolding;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.util.TestUtilities;
 import org.apache.drill.exec.util.Text;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -188,4 +194,64 @@ public class TestDirectoryExplorerUDFs extends PlanTestBase {
     }
   }
 
+  @Test // DRILL-4720
+  public void testDirectoryUDFsWithAndWithoutMetadataCache() throws Exception {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(new Configuration());
+
+      // prepare test table with partitions
+      Path table = new Path(getTempDir("table_with_partitions"));
+      String tablePath = table.toUri().getPath();
+      Path dataFile = new Path(TestTools.getWorkingPath(),"src/test/resources/parquet/alltypes_required.parquet");
+      createPartitions(fs, table, dataFile, 2);
+
+      Map<String, String> configurations = ImmutableMap.<String, String>builder()
+          .put("mindir", "part_1")
+          .put("imindir", "part_1")
+          .put("maxdir", "part_2")
+          .put("imaxdir", "part_2")
+          .build();
+
+      String query = "select dir0 from dfs.`%s` where dir0 = %s('dfs', '%s') limit 1";
+
+      // run tests without metadata cache
+      for (Map.Entry<String, String> entry : configurations.entrySet()) {
+        testBuilder()
+            .sqlQuery(query, tablePath, entry.getKey(), tablePath)
+            .unOrdered()
+            .baselineColumns("dir0")
+            .baselineValues(entry.getValue())
+            .go()
+        ;
+      }
+
+      // generate metadata
+      test("refresh table metadata dfs.`%s`", tablePath);
+
+      // run tests with metadata cache
+      for (Map.Entry<String, String> entry : configurations.entrySet()) {
+        testBuilder()
+            .sqlQuery(query, tablePath, entry.getKey(), tablePath)
+            .unOrdered()
+            .baselineColumns("dir0")
+            .baselineValues(entry.getValue())
+            .go();
+      }
+
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+    }
+  }
+
+  private void createPartitions(FileSystem fs, Path table, Path dataFile, int number) throws IOException {
+    for (int i = 1; i <= number; i++) {
+      Path partition = new Path(table, "part_" + i);
+      fs.mkdirs(partition);
+      FileUtil.copy(fs, dataFile, fs, partition, false, true, fs.getConf());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/test/java/org/apache/drill/exec/util/DrillFileSystemUtilTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/DrillFileSystemUtilTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/DrillFileSystemUtilTest.java
new file mode 100644
index 0000000..e26c5c6
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/DrillFileSystemUtilTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class DrillFileSystemUtilTest extends FileSystemUtilTestBase {
+
+  @Test
+  public void testListDirectoriesWithoutFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listDirectories(fs, base, false);
+    assertEquals("Directory count should match", 2, statuses.size());
+  }
+
+  @Test
+  public void testListDirectoriesWithFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listDirectories(fs, base, false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a");
+      }
+    });
+    assertEquals("Directory count should match", 1, statuses.size());
+    assertEquals("Directory name should match", "a", statuses.get(0).getPath().getName());
+  }
+
+  @Test
+  public void testListDirectoriesRecursiveWithoutFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listDirectories(fs, base, true);
+    assertEquals("Directory count should match", 3, statuses.size());
+  }
+
+  @Test
+  public void testListDirectoriesRecursiveWithFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listDirectories(fs, base, true, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a");
+      }
+    });
+    assertEquals("Directory count should match", 2, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("Directory name should match", "a", statuses.get(0).getPath().getName());
+    assertEquals("Directory name should match", "aa", statuses.get(1).getPath().getName());
+  }
+
+  @Test
+  public void testListFilesWithoutFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listFiles(fs, new Path(base, "a"), false);
+    assertEquals("File count should match", 1, statuses.size());
+    assertEquals("File name should match", "f.txt", statuses.get(0).getPath().getName());
+  }
+
+  @Test
+  public void testListFilesWithFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listFiles(fs, new Path(base, "a"), false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("File count should match", 1, statuses.size());
+    assertEquals("File name should match", "f.txt", statuses.get(0).getPath().getName());
+  }
+
+  @Test
+  public void testListFilesRecursiveWithoutFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listFiles(fs, base, true);
+    assertEquals("File count should match", 3, statuses.size());
+  }
+
+  @Test
+  public void testListFilesRecursiveWithFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listFiles(fs, base, true, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a") || path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("File count should match", 2, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("File name should match", "f.txt", statuses.get(0).getPath().getName());
+    assertEquals("File name should match", "f.txt", statuses.get(1).getPath().getName());
+  }
+
+  @Test
+  public void testListAllWithoutFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listAll(fs, new Path(base, "a"), false);
+    assertEquals("File count should match", 2, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("File name should match", "aa", statuses.get(0).getPath().getName());
+    assertEquals("File name should match", "f.txt", statuses.get(1).getPath().getName());
+  }
+
+  @Test
+  public void testListAllWithFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listAll(fs, new Path(base, "a"), false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a") || path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("Directory and file count should match", 2, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("Directory name should match", "aa", statuses.get(0).getPath().getName());
+    assertEquals("File name should match", "f.txt", statuses.get(1).getPath().getName());
+  }
+
+  @Test
+  public void testListAllRecursiveWithoutFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listAll(fs, base, true);
+    assertEquals("Directory and file count should match", 6, statuses.size());
+  }
+
+  @Test
+  public void testListAllRecursiveWithFilter() throws IOException {
+    List<FileStatus> statuses = DrillFileSystemUtil.listAll(fs, new Path(base, "a"), true, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("a") || path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("Directory and file count should match", 3, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("Directory name should match", "aa", statuses.get(0).getPath().getName());
+    assertEquals("File name should match", "f.txt", statuses.get(1).getPath().getName());
+    assertEquals("File name should match", "f.txt", statuses.get(2).getPath().getName());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java
new file mode 100644
index 0000000..47883e4
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FileSystemUtilTest extends FileSystemUtilTestBase {
+
+  @Test
+  public void testListDirectoriesWithoutFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listDirectories(fs, base, false);
+    assertEquals("Directory count should match", 4, statuses.size());
+  }
+
+  @Test
+  public void testListDirectoriesWithFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listDirectories(fs, base, false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a");
+      }
+    });
+    assertEquals("Directory count should match", 3, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("Directory name should match", ".a", statuses.get(0).getPath().getName());
+    assertEquals("Directory name should match", "_a", statuses.get(1).getPath().getName());
+    assertEquals("Directory name should match", "a", statuses.get(2).getPath().getName());
+  }
+
+  @Test
+  public void testListDirectoriesRecursiveWithoutFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listDirectories(fs, base, true);
+    assertEquals("Directory count should match", 5, statuses.size());
+  }
+
+  @Test
+  public void testListDirectoriesRecursiveWithFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listDirectories(fs, base, true, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a");
+      }
+    });
+    assertEquals("Directory count should match", 4, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("Directory name should match", ".a", statuses.get(0).getPath().getName());
+    assertEquals("Directory name should match", "_a", statuses.get(1).getPath().getName());
+    assertEquals("Directory name should match", "a", statuses.get(2).getPath().getName());
+    assertEquals("Directory name should match", "aa", statuses.get(3).getPath().getName());
+  }
+
+  @Test
+  public void testListDirectoriesEmptyResult() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listDirectories(fs, base, false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("abc");
+      }
+    });
+    assertEquals("Directory count should match", 0, statuses.size());
+  }
+
+  @Test
+  public void testListFilesWithoutFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listFiles(fs, new Path(base, "a"), false);
+    assertEquals("File count should match", 3, statuses.size());
+  }
+
+  @Test
+  public void testListFilesWithFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listFiles(fs, new Path(base, "a"), false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("File count should match", 3, statuses.size());
+
+    Collections.sort(statuses);
+    assertEquals("File name should match", ".f.txt", statuses.get(0).getPath().getName());
+    assertEquals("File name should match", "_f.txt", statuses.get(1).getPath().getName());
+    assertEquals("File name should match", "f.txt", statuses.get(2).getPath().getName());
+  }
+
+  @Test
+  public void testListFilesRecursiveWithoutFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listFiles(fs, base, true);
+    assertEquals("File count should match", 11, statuses.size());
+  }
+
+  @Test
+  public void testListFilesRecursiveWithFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listFiles(fs, base, true, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a") || path.getName().endsWith(".txt");
+      }
+    });
+
+    assertEquals("File count should match", 8, statuses.size());
+  }
+
+  @Test
+  public void testListFilesEmptyResult() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listFiles(fs, base, false);
+    assertEquals("File count should match", 0, statuses.size());
+  }
+
+  @Test
+  public void testListAllWithoutFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listAll(fs, new Path(base, "a"), false);
+    assertEquals("Directory and file count should match", 4, statuses.size());
+  }
+
+  @Test
+  public void testListAllWithFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listAll(fs, new Path(base, "a"), false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a") || path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("Directory and file count should match", 4, statuses.size());
+  }
+
+  @Test
+  public void testListAllRecursiveWithoutFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listAll(fs, new Path(base, "a"), true);
+    assertEquals("Directory and file count should match", 7, statuses.size());
+  }
+
+  @Test
+  public void testListAllRecursiveWithFilter() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listAll(fs, new Path(base, "a"), true, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith("a") || path.getName().endsWith(".txt");
+      }
+    });
+    assertEquals("Directory and file count should match", 7, statuses.size());
+  }
+
+  @Test
+  public void testListAllEmptyResult() throws IOException {
+    List<FileStatus> statuses = FileSystemUtil.listAll(fs, base, false, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("xyz");
+      }
+    });
+    assertEquals("Directory and file count should match", 0, statuses.size());
+  }
+
+  @Test
+  public void testMergeFiltersWithMissingParameters() {
+    PathFilter filter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("a");
+      }
+    };
+
+    assertEquals("Should have returned initial filter", filter, FileSystemUtil.mergeFilters(filter, null));
+    assertEquals("Should have returned initial filter", filter, FileSystemUtil.mergeFilters(filter, new PathFilter[]{}));
+    assertEquals("Should have returned dummy filter", FileSystemUtil.DUMMY_FILTER, FileSystemUtil.mergeFilters());
+  }
+
+  @Test
+  public void mergeFiltersTrue() {
+    Path file = new Path("abc.txt");
+
+    PathFilter firstFilter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("a");
+      }
+    };
+
+    PathFilter secondFilter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".txt");
+      }
+    };
+
+    assertTrue("Path should have been included in the path list", FileSystemUtil.mergeFilters(firstFilter, secondFilter).accept(file));
+    assertTrue("Path should have been included in the path list", FileSystemUtil.mergeFilters(firstFilter, new PathFilter[] {secondFilter}).accept(file));
+  }
+
+  @Test
+  public void mergeFiltersFalse() {
+    Path file = new Path("abc.txt");
+
+    PathFilter firstFilter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("a");
+      }
+    };
+
+    PathFilter secondFilter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".csv");
+      }
+    };
+
+    assertFalse("Path should have been excluded from the path list", FileSystemUtil.mergeFilters(firstFilter, secondFilter).accept(file));
+    assertFalse("Path should have been excluded from the path list", FileSystemUtil.mergeFilters(firstFilter, new PathFilter[] {secondFilter}).accept(file));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a0c178ba/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTestBase.java
new file mode 100644
index 0000000..1df25ee
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/FileSystemUtilTestBase.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import com.google.common.base.Strings;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Base test class for file system util classes that will during test initialization
+ * setup file system connection and create directories and files needed for unit tests.
+ */
+public class FileSystemUtilTestBase {
+
+  /*
+    Directory and file structure created during test initialization:
+    ../a
+    ../a/f.txt
+    ../a/.f.txt
+    ../a/_f.txt
+
+    ../a/aa
+    ../a/aa/f.txt
+    ../a/aa/.f.txt
+    ../a/aa/_f.txt
+
+    ../b
+    ../b/f.txt
+    ../b/.f.txt
+    ../b/_f.txt
+
+    ../.a
+    ../.a/f.txt
+
+    ../_a
+    ../_a/f.txt
+  */
+  protected static FileSystem fs;
+  protected static Path base;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    // initialize file system
+    fs = FileSystem.get(new Configuration());
+
+    // create temporary directory with sub-folders and files
+    final File tempDir = Files.createTempDir();
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        FileUtils.deleteQuietly(tempDir);
+      }
+    });
+    base = new Path(tempDir.toURI().getPath());
+
+    createDefaultStructure(fs, base, "a", 2);
+    createDefaultStructure(fs, base, "b", 1);
+
+    // create hidden directory with file
+    Path hiddenDirectory = new Path(base, ".a");
+    fs.mkdirs(hiddenDirectory);
+    fs.createNewFile(new Path(hiddenDirectory, "f.txt"));
+
+    // create underscore directory with file
+    Path underscoreDirectory = new Path(base, "_a");
+    fs.mkdirs(underscoreDirectory);
+    fs.createNewFile(new Path(underscoreDirectory, "f.txt"));
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (fs != null) {
+      fs.close();
+    }
+  }
+
+  private static void createDefaultStructure(FileSystem fs, Path base, String name, int nesting) throws IOException {
+    Path newBase = base;
+    for (int i = 1; i <= nesting; i++) {
+      Path path = new Path(newBase, Strings.repeat(name, i));
+      fs.mkdirs(path);
+      for (String fileName : Arrays.asList("f.txt", ".f.txt", "_f.txt")) {
+        fs.createNewFile(new Path(path, fileName));
+      }
+      newBase = path;
+    }
+  }
+
+}


[2/5] drill git commit: DRILL-5669: Add a configurable option for minimum memory allocation to buffered ops

Posted by ar...@apache.org.
DRILL-5669: Add a configurable option for minimum memory allocation to buffered ops

closes #879


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6b0e8378
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6b0e8378
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6b0e8378

Branch: refs/heads/master
Commit: 6b0e8378cc198c926f5b1cf95a07fd4c9fc092de
Parents: 35a7ad0
Author: Boaz Ben-Zvi <bo...@BBenZvi-E754-MBP13.local>
Authored: Mon Jul 17 14:21:29 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri Jul 21 16:32:26 2017 +0300

----------------------------------------------------------------------
 .../src/main/java/org/apache/drill/exec/ExecConstants.java  | 9 +++++++++
 .../drill/exec/server/options/SystemOptionManager.java      | 1 +
 .../apache/drill/exec/util/MemoryAllocationUtilities.java   | 8 ++++++--
 3 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6b0e8378/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 5b82d1f..97cb321 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -343,6 +343,15 @@ public interface ExecConstants {
       MAX_QUERY_MEMORY_PER_NODE_KEY, 1024 * 1024, Long.MAX_VALUE, 2 * 1024 * 1024 * 1024L);
 
   /**
+   * Minimum memory alocated to each buffered operator instance.
+   * <p/>
+   * DEFAULT: 40 MB
+   */
+  String MIN_MEMORY_PER_BUFFERED_OP_KEY = "planner.memory.min_memory_per_buffered_op";
+  LongValidator MIN_MEMORY_PER_BUFFERED_OP = new RangeLongValidator(
+      MIN_MEMORY_PER_BUFFERED_OP_KEY, 1024 * 1024, Long.MAX_VALUE, 40 * 1024 * 1024L);
+
+  /**
    * Extra query memory per node for non-blocking operators.
    * NOTE: This option is currently used only for memory estimation.
    * <p/>

http://git-wip-us.apache.org/repos/asf/drill/blob/6b0e8378/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index c2a4d65..3392a21 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -144,6 +144,7 @@ public class SystemOptionManager extends BaseOptionManager implements OptionMana
       ExecConstants.EARLY_LIMIT0_OPT,
       ExecConstants.ENABLE_MEMORY_ESTIMATION,
       ExecConstants.MAX_QUERY_MEMORY_PER_NODE,
+      ExecConstants.MIN_MEMORY_PER_BUFFERED_OP,
       ExecConstants.NON_BLOCKING_OPERATORS_MEMORY,
       ExecConstants.HASH_JOIN_TABLE_FACTOR,
       ExecConstants.HASH_AGG_TABLE_FACTOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/6b0e8378/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
index 79b49e4..4580222 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -66,11 +66,15 @@ public class MemoryAllocationUtilities {
       final long maxOperatorAlloc = maxAllocPerNode / (bufferedOpList.size() * maxWidthPerNode);
       logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc);
 
+      // User configurable option to allow forcing minimum memory.
+      // Ensure that the buffered ops receive the minimum memory needed to make progress.
+      // Without this, the math might work out to allocate too little memory.
+      final long opMinMem = queryContext.getOptions().getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP_KEY).num_val;
+
       for(final PhysicalOperator op : bufferedOpList) {
-        // Ensure that the sort receives the minimum memory needed to make progress.
-        // Without this, the math might work out to allocate too little memory.
 
         long alloc = Math.max(maxOperatorAlloc, op.getInitialAllocation());
+        alloc = Math.max(alloc, opMinMem);
         op.setMaxAllocation(alloc);
       }
     }


[4/5] drill git commit: DRILL-5083: status.getOutcome() return FAILURE if one of the batches has STOP status (to avoid infinite loop in Merge Join).

Posted by ar...@apache.org.
DRILL-5083: status.getOutcome() return FAILURE if one of the batches has STOP status (to avoid infinite loop in Merge Join).

closes #881


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/368bc38b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/368bc38b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/368bc38b

Branch: refs/heads/master
Commit: 368bc38b162e43c7c9e6ce302ede87bccf019052
Parents: 943a143
Author: Roman Kulyk <ro...@gmail.com>
Authored: Thu Jul 20 16:33:49 2017 +0300
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri Jul 21 19:38:16 2017 +0300

----------------------------------------------------------------------
 .../org/apache/drill/exec/physical/impl/join/JoinStatus.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/368bc38b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 8e48515..527c984 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -138,7 +138,8 @@ public final class JoinStatus {
    *  4. JoinOutcome.SCHEMA_CHANGED : one of the side has change in schema.
    */
   public JoinOutcome getOutcome() {
-    if (!ok) {
+    // on STOP, OUT_OF_MEMORY return FAILURE.
+    if (!ok || eitherMatches(IterOutcome.STOP)) {
       return JoinOutcome.FAILURE;
     }
     if (hasMoreData) {
@@ -162,7 +163,7 @@ public final class JoinStatus {
       return JoinOutcome.WAITING;
     }
     ok = false;
-    // on STOP, OUT_OF_MEMORY return FAILURE.
+
     return JoinOutcome.FAILURE;
   }
 


[3/5] drill git commit: DRILL-5665: Add the option planner.force_2phase_aggr to override small inputs

Posted by ar...@apache.org.
DRILL-5665: Add the option planner.force_2phase_aggr to override small inputs

closes #872


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/943a1432
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/943a1432
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/943a1432

Branch: refs/heads/master
Commit: 943a1432943c5aff0db76b8eb2d6cf9273737b6f
Parents: 6b0e837
Author: Boaz Ben-Zvi <bo...@BBenZvi-E754-MBP13.local>
Authored: Tue Jul 18 15:18:08 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri Jul 21 16:32:42 2017 +0300

----------------------------------------------------------------------
 .../org/apache/drill/exec/planner/physical/AggPruleBase.java  | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/943a1432/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index 460ee8a..3de5fca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -61,12 +61,11 @@ public abstract class AggPruleBase extends Prule {
   // currently won't generate a 2 phase plan.
   protected boolean create2PhasePlan(RelOptRuleCall call, DrillAggregateRel aggregate) {
     PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
-    if ( settings.isForce2phaseAggr() ) { // for testing - force 2 phase aggr
-      return true;
-    }
     RelNode child = call.rel(0).getInputs().get(0);
     boolean smallInput = child.getRows() < settings.getSliceTarget();
-    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() || smallInput) {
+    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() ||
+        // Can override a small child - e.g., for testing with a small table
+        ( smallInput && ! settings.isForce2phaseAggr() ) ) {
       return false;
     }