You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/07/04 03:00:11 UTC
[2/3] drill git commit: DRILL-5420: ParquetAsyncPgReader goes into
infinite loop during cleanup
DRILL-5420: ParquetAsyncPgReader goes into infinite loop during cleanup
PageQueue is cleaned up using poll() instead of take(), which constantly gets interrupted and causes CPU churn.
During a columnReader shutdown, a flag is set so as to block any new page reading tasks from being submitted.
closes #862
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0e1e6042
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0e1e6042
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0e1e6042
Branch: refs/heads/master
Commit: 0e1e6042f507ac0da2a21d7de3bbed4c0dac549a
Parents: 0b830ef
Author: Kunal Khatua <kk...@maprtech.com>
Authored: Wed Jun 28 17:39:08 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Jul 3 18:02:11 2017 -0700
----------------------------------------------------------------------
.../parquet/columnreaders/AsyncPageReader.java | 21 +++++++++++++-------
.../parquet/columnreaders/BatchReader.java | 4 +++-
.../parquet/columnreaders/ColumnReader.java | 11 +++++++---
.../columnreaders/VarLenBinaryReader.java | 4 +++-
4 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 926436c..8c89e3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -130,7 +130,10 @@ class AsyncPageReader extends PageReader {
@Override protected void init() throws IOException {
super.init();
- asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+ //Avoid Init if a shutdown is already in progress even if init() is called once
+ if (!parentColumnReader.isShuttingDown) {
+ asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+ }
}
private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
@@ -224,7 +227,7 @@ class AsyncPageReader extends PageReader {
}
//if the queue was full before we took a page out, then there would
// have been no new read tasks scheduled. In that case, schedule a new read.
- if (pageQueueFull) {
+ if (!parentColumnReader.isShuttingDown && pageQueueFull) {
asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
}
}
@@ -256,7 +259,7 @@ class AsyncPageReader extends PageReader {
}
//if the queue was full before we took a page out, then there would
// have been no new read tasks scheduled. In that case, schedule a new read.
- if (pageQueueFull) {
+ if (!parentColumnReader.isShuttingDown && pageQueueFull) {
asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
}
}
@@ -278,6 +281,7 @@ class AsyncPageReader extends PageReader {
}
@Override public void clear() {
+ //Cancelling all existing AsyncPageReaderTasks
while (asyncPageRead != null && !asyncPageRead.isEmpty()) {
try {
Future<Void> f = asyncPageRead.poll();
@@ -298,12 +302,13 @@ class AsyncPageReader extends PageReader {
while (!pageQueue.isEmpty()) {
r = null;
try {
- r = pageQueue.take();
+ r = pageQueue.poll();
if (r == ReadStatus.EMPTY) {
break;
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ //Reporting because we shouldn't get this
+ logger.error(e.getMessage());
} finally {
if (r != null && r.pageData != null) {
r.pageData.release();
@@ -412,6 +417,7 @@ class AsyncPageReader extends PageReader {
try {
PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
int compressedSize = pageHeader.getCompressed_page_size();
+ if ( parent.parentColumnReader.isShuttingDown ) { return null; } //Opportunity to skip expensive Parquet processing
pageData = parent.dataReader.getNext(compressedSize);
bytesRead = compressedSize;
@@ -438,7 +444,7 @@ class AsyncPageReader extends PageReader {
queue.put(readStatus);
// if the queue is not full, schedule another read task immediately. If it is then the consumer
// will schedule a new read task as soon as it removes a page from the queue.
- if (queue.remainingCapacity() > 0) {
+ if (!parentColumnReader.isShuttingDown && queue.remainingCapacity() > 0) {
asyncPageRead.offer(parent.threadPool.submit(new AsyncPageReaderTask(debugName, queue)));
}
}
@@ -454,6 +460,7 @@ class AsyncPageReader extends PageReader {
}
parent.handleAndThrowException(e, "Exception occurred while reading from disk.");
} finally {
+ //Nothing to do if isShuttingDown.
}
return null;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
index 651c813..367b226 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
@@ -68,7 +68,9 @@ public abstract class BatchReader {
ArrayList<Future<Long>> futures = Lists.newArrayList();
for (ColumnReader<?> crs : readState.getColumnReaders()) {
Future<Long> f = crs.processPagesAsync(recordsToRead);
- futures.add(f);
+ if (f != null) {
+ futures.add(f);
+ }
}
Exception exception = null;
for(Future<Long> f: futures){
http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index 2d8f556..8b6e7a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -81,6 +81,8 @@ public abstract class ColumnReader<V extends ValueVector> {
long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
private ExecutorService threadPool;
+ volatile boolean isShuttingDown; //Indicate to not submit any new AsyncPageReader Tasks during clear()
+
protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
this.parentReader = parentReader;
@@ -125,7 +127,7 @@ public abstract class ColumnReader<V extends ValueVector> {
}
public Future<Long> processPagesAsync(long recordsToReadInThisPass){
- Future<Long> r = threadPool.submit(new ColumnReaderProcessPagesTask(recordsToReadInThisPass));
+ Future<Long> r = (isShuttingDown ? null : threadPool.submit(new ColumnReaderProcessPagesTask(recordsToReadInThisPass)));
return r;
}
@@ -143,6 +145,9 @@ public abstract class ColumnReader<V extends ValueVector> {
}
public void clear() {
+ //State to indicate no more tasks to be scheduled
+ isShuttingDown = true;
+
valueVec.clear();
pageReader.clear();
}
@@ -190,8 +195,8 @@ public abstract class ColumnReader<V extends ValueVector> {
return checkVectorCapacityReached();
}
- protected Future<Integer> readRecordsAsync(int recordsToRead){
- Future<Integer> r = threadPool.submit(new ColumnReaderReadRecordsTask(recordsToRead));
+ protected Future<Integer> readRecordsAsync(int recordsToRead) {
+ Future<Integer> r = (isShuttingDown ? null : threadPool.submit(new ColumnReaderReadRecordsTask(recordsToRead)));
return r;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index b598ac8..900348f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -119,7 +119,9 @@ public class VarLenBinaryReader {
ArrayList<Future<Integer>> futures = Lists.newArrayList();
for (VarLengthColumn<?> columnReader : columns) {
Future<Integer> f = columnReader.readRecordsAsync(columnReader.pageReader.valuesReadyToRead);
- futures.add(f);
+ if (f != null) {
+ futures.add(f);
+ }
}
Exception exception = null;
for(Future<Integer> f: futures){