You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/07/04 03:00:11 UTC

[2/3] drill git commit: DRILL-5420: ParquetAsyncPgReader goes into infinite loop during cleanup

DRILL-5420: ParquetAsyncPgReader goes into infinite loop during cleanup

PageQueue is cleaned up using poll() instead of take(), which constantly gets interrupted and causes CPU churn.
During a columnReader shutdown, a flag is set so as to block any new page reading tasks from being submitted.

closes #862


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0e1e6042
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0e1e6042
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0e1e6042

Branch: refs/heads/master
Commit: 0e1e6042f507ac0da2a21d7de3bbed4c0dac549a
Parents: 0b830ef
Author: Kunal Khatua <kk...@maprtech.com>
Authored: Wed Jun 28 17:39:08 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Jul 3 18:02:11 2017 -0700

----------------------------------------------------------------------
 .../parquet/columnreaders/AsyncPageReader.java  | 21 +++++++++++++-------
 .../parquet/columnreaders/BatchReader.java      |  4 +++-
 .../parquet/columnreaders/ColumnReader.java     | 11 +++++++---
 .../columnreaders/VarLenBinaryReader.java       |  4 +++-
 4 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 926436c..8c89e3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -130,7 +130,10 @@ class AsyncPageReader extends PageReader {
 
   @Override protected void init() throws IOException {
     super.init();
-    asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+    //Avoid Init if a shutdown is already in progress even if init() is called once
+    if (!parentColumnReader.isShuttingDown) {
+      asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+    }
   }
 
   private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
@@ -224,7 +227,7 @@ class AsyncPageReader extends PageReader {
         }
         //if the queue was full before we took a page out, then there would
         // have been no new read tasks scheduled. In that case, schedule a new read.
-        if (pageQueueFull) {
+        if (!parentColumnReader.isShuttingDown && pageQueueFull) {
           asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
         }
       }
@@ -256,7 +259,7 @@ class AsyncPageReader extends PageReader {
             }
             //if the queue was full before we took a page out, then there would
             // have been no new read tasks scheduled. In that case, schedule a new read.
-            if (pageQueueFull) {
+            if (!parentColumnReader.isShuttingDown && pageQueueFull) {
               asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
             }
           }
@@ -278,6 +281,7 @@ class AsyncPageReader extends PageReader {
   }
 
   @Override public void clear() {
+    //Cancelling all existing AsyncPageReaderTasks
     while (asyncPageRead != null && !asyncPageRead.isEmpty()) {
       try {
         Future<Void> f = asyncPageRead.poll();
@@ -298,12 +302,13 @@ class AsyncPageReader extends PageReader {
     while (!pageQueue.isEmpty()) {
       r = null;
       try {
-        r = pageQueue.take();
+        r = pageQueue.poll();
         if (r == ReadStatus.EMPTY) {
           break;
         }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
+      } catch (Exception e) {
+        //Reporting because we shouldn't get this
+        logger.error(e.getMessage());
       } finally {
         if (r != null && r.pageData != null) {
           r.pageData.release();
@@ -412,6 +417,7 @@ class AsyncPageReader extends PageReader {
       try {
         PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
         int compressedSize = pageHeader.getCompressed_page_size();
+        if ( parent.parentColumnReader.isShuttingDown ) { return null; } //Opportunity to skip expensive Parquet processing
         pageData = parent.dataReader.getNext(compressedSize);
         bytesRead = compressedSize;
 
@@ -438,7 +444,7 @@ class AsyncPageReader extends PageReader {
           queue.put(readStatus);
           // if the queue is not full, schedule another read task immediately. If it is then the consumer
           // will schedule a new read task as soon as it removes a page from the queue.
-          if (queue.remainingCapacity() > 0) {
+          if (!parentColumnReader.isShuttingDown && queue.remainingCapacity() > 0) {
             asyncPageRead.offer(parent.threadPool.submit(new AsyncPageReaderTask(debugName, queue)));
           }
         }
@@ -454,6 +460,7 @@ class AsyncPageReader extends PageReader {
         }
         parent.handleAndThrowException(e, "Exception occurred while reading from disk.");
       } finally {
+        //Nothing to do if isShuttingDown.
     }
       return null;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
index 651c813..367b226 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
@@ -68,7 +68,9 @@ public abstract class BatchReader {
     ArrayList<Future<Long>> futures = Lists.newArrayList();
     for (ColumnReader<?> crs : readState.getColumnReaders()) {
       Future<Long> f = crs.processPagesAsync(recordsToRead);
-      futures.add(f);
+      if (f != null) {
+        futures.add(f);
+      }
     }
     Exception exception = null;
     for(Future<Long> f: futures){

http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index 2d8f556..8b6e7a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -81,6 +81,8 @@ public abstract class ColumnReader<V extends ValueVector> {
   long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
   private ExecutorService threadPool;
 
+  volatile boolean isShuttingDown; //Indicate to not submit any new AsyncPageReader Tasks during clear()
+
   protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
       ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
     this.parentReader = parentReader;
@@ -125,7 +127,7 @@ public abstract class ColumnReader<V extends ValueVector> {
   }
 
   public Future<Long> processPagesAsync(long recordsToReadInThisPass){
-    Future<Long> r = threadPool.submit(new ColumnReaderProcessPagesTask(recordsToReadInThisPass));
+    Future<Long> r = (isShuttingDown ? null : threadPool.submit(new ColumnReaderProcessPagesTask(recordsToReadInThisPass)));
     return r;
   }
 
@@ -143,6 +145,9 @@ public abstract class ColumnReader<V extends ValueVector> {
   }
 
   public void clear() {
+    //State to indicate no more tasks to be scheduled
+    isShuttingDown = true;
+
     valueVec.clear();
     pageReader.clear();
   }
@@ -190,8 +195,8 @@ public abstract class ColumnReader<V extends ValueVector> {
     return checkVectorCapacityReached();
   }
 
-  protected Future<Integer> readRecordsAsync(int recordsToRead){
-    Future<Integer> r = threadPool.submit(new ColumnReaderReadRecordsTask(recordsToRead));
+  protected Future<Integer> readRecordsAsync(int recordsToRead) {
+    Future<Integer> r = (isShuttingDown ? null : threadPool.submit(new ColumnReaderReadRecordsTask(recordsToRead)));
     return r;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index b598ac8..900348f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -119,7 +119,9 @@ public class VarLenBinaryReader {
     ArrayList<Future<Integer>> futures = Lists.newArrayList();
     for (VarLengthColumn<?> columnReader : columns) {
       Future<Integer> f = columnReader.readRecordsAsync(columnReader.pageReader.valuesReadyToRead);
-      futures.add(f);
+      if (f != null) {
+        futures.add(f);
+      }
     }
     Exception exception = null;
     for(Future<Integer> f: futures){