You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2017/06/12 01:27:42 UTC
[4/4] drill git commit: DRILL-5545: Fix issues reported by findbugs
in Async Parquet Reader. This closes #847
DRILL-5545: Fix issues reported by findbugs in Async Parquet Reader.
This closes #847
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a7e29876
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a7e29876
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a7e29876
Branch: refs/heads/master
Commit: a7e298760f9c9efa281d61a1d2ec776139f69225
Parents: 610602e
Author: Parth Chandra <pc...@maprtech.com>
Authored: Wed May 17 16:56:13 2017 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Sun Jun 11 17:28:53 2017 -0700
----------------------------------------------------------------------
.../parquet/columnreaders/AsyncPageReader.java | 118 +++++++------------
.../parquet/columnreaders/ColumnReader.java | 17 ++-
.../store/parquet/columnreaders/PageReader.java | 31 ++++-
3 files changed, 83 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/a7e29876/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 2e94f56..926436c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -39,6 +39,7 @@ import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.xerial.snappy.Snappy;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
@@ -86,17 +87,16 @@ class AsyncPageReader extends PageReader {
private LinkedBlockingQueue<ReadStatus> pageQueue;
private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
private long totalPageValuesRead = 0;
+ private Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
+ // FindBugs complains if we synchronize on a Concurrent Queue
AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
super(parentStatus, fs, path, columnChunkMetaData);
- if (threadPool == null && asyncPageRead == null) {
- threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
- queueSize = parentColumnReader.parentReader.readQueueSize;
- pageQueue = new LinkedBlockingQueue<>((int)queueSize);
- asyncPageRead = new ConcurrentLinkedQueue<>();
- asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
- }
+ threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+ queueSize = parentColumnReader.parentReader.readQueueSize;
+ pageQueue = new LinkedBlockingQueue<>((int) queueSize);
+ asyncPageRead = new ConcurrentLinkedQueue<>();
}
@Override
@@ -105,22 +105,34 @@ class AsyncPageReader extends PageReader {
if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
try {
assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
- dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos());
+ long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
+ while (bytesToSkip > 0) {
+ long skipped = dataReader.skip(bytesToSkip);
+ if (skipped > 0) {
+ bytesToSkip -= skipped;
+ } else {
+ // no good way to handle this. Guava uses InputStream.available to check
+ // if EOF is reached and because available is not reliable,
+ // tries to read the rest of the data.
+ DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
+ if (skipBuf != null) {
+ skipBuf.release();
+ } else {
+ throw new EOFException("End of File reached.");
+ }
+ }
+ }
} catch (IOException e) {
handleAndThrowException(e, "Error Reading dictionary page.");
}
- // parent constructor may call this method before the thread pool is set.
- if (threadPool == null && asyncPageRead == null) {
- threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
- queueSize = parentColumnReader.parentReader.getFragmentContext().getOptions()
- .getOption(ExecConstants.PARQUET_PAGEREADER_QUEUE_SIZE).num_val;
- pageQueue = new LinkedBlockingQueue<ReadStatus>((int)queueSize);
- asyncPageRead = new ConcurrentLinkedQueue<>();
- asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
- }
}
}
+ @Override protected void init() throws IOException {
+ super.init();
+ asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+ }
+
private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
DrillBuf data;
boolean isDictionary = false;
@@ -146,35 +158,6 @@ class AsyncPageReader extends PageReader {
return data;
}
- // Read and decode the dictionary and the header
- private void readDictionaryPage( final ColumnReader<?> parentStatus) throws UserException {
- try {
- Stopwatch timer = Stopwatch.createStarted();
- ReadStatus readStatus = null;
- synchronized(pageQueue) {
- boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
- asyncPageRead.poll().get(); // get the result of execution
- readStatus = pageQueue.take(); // get the data if no exception has been thrown
- assert (readStatus.pageData != null);
- //if the queue was full before we took a page out, then there would
- // have been no new read tasks scheduled. In that case, schedule a new read.
- if (pageQueueFull) {
- asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
- }
- }
- long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
- stats.timeDiskScanWait.addAndGet(timeBlocked);
- stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
- stats.numDictPageLoads.incrementAndGet();
- stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
- readDictionaryPageData(readStatus, parentStatus);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- handleAndThrowException(e, "Error reading dictionary page.");
- }
- }
-
// Read and decode the dictionary data
private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
throws UserException {
@@ -229,12 +212,11 @@ class AsyncPageReader extends PageReader {
@Override
protected void nextInternal() throws IOException {
ReadStatus readStatus = null;
- String name = parentColumnReader.columnChunkMetaData.toString();
try {
Stopwatch timer = Stopwatch.createStarted();
parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
asyncPageRead.poll().get(); // get the result of execution
- synchronized(pageQueue) {
+ synchronized (pageQueueSyncronize) {
boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
readStatus = pageQueue.take(); // get the data if no exception has been thrown
if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
@@ -259,14 +241,14 @@ class AsyncPageReader extends PageReader {
}
pageHeader = readStatus.getPageHeader();
- // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
- // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
+ // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
+ // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
do {
if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
readDictionaryPageData(readStatus, parentColumnReader);
asyncPageRead.poll().get(); // get the result of execution
- synchronized (pageQueue) {
+ synchronized (pageQueueSyncronize) {
boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
readStatus = pageQueue.take(); // get the data if no exception has been thrown
if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
@@ -278,17 +260,18 @@ class AsyncPageReader extends PageReader {
asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
}
}
- assert (readStatus.pageData != null);
pageHeader = readStatus.getPageHeader();
}
} while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
- pageHeader = readStatus.getPageHeader();
- pageData = getDecompressedPageData(readStatus);
- assert(pageData != null);
+ pageHeader = readStatus.getPageHeader();
+ pageData = getDecompressedPageData(readStatus);
+ assert (pageData != null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- } catch (Exception e){
+ } catch (RuntimeException e) { // Catch this explicitly to satisfy findbugs
+ handleAndThrowException(e, "Error reading page data");
+ } catch (Exception e) {
handleAndThrowException(e, "Error reading page data");
}
@@ -303,13 +286,14 @@ class AsyncPageReader extends PageReader {
} else {
f.get(1, TimeUnit.MILLISECONDS);
}
+ } catch (RuntimeException e) {
+ // Do Nothing
} catch (Exception e) {
// Do nothing.
}
}
//Empty the page queue
- String name = parentColumnReader.columnChunkMetaData.toString();
ReadStatus r;
while (!pageQueue.isEmpty()) {
r = null;
@@ -426,25 +410,10 @@ class AsyncPageReader extends PageReader {
DrillBuf pageData = null;
timer.reset();
try {
- long s = parent.dataReader.getPos();
PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
- //long e = parent.dataReader.getPos();
- //if (logger.isTraceEnabled()) {
- // logger.trace("[{}]: Read Page Header : ReadPos = {} : Bytes Read = {} ", name, s, e - s);
- //}
int compressedSize = pageHeader.getCompressed_page_size();
- s = parent.dataReader.getPos();
pageData = parent.dataReader.getNext(compressedSize);
bytesRead = compressedSize;
- //e = parent.dataReader.getPos();
- //if (logger.isTraceEnabled()) {
- // DrillBuf bufStart = pageData.slice(0, compressedSize>100?100:compressedSize);
- // int endOffset = compressedSize>100?compressedSize-100:0;
- // DrillBuf bufEnd = pageData.slice(endOffset, compressedSize-endOffset);
- // logger
- // .trace("[{}]: Read Page Data : ReadPos = {} : Bytes Read = {} : Buf Start = {} : Buf End = {} ",
- // name, s, e - s, ByteBufUtil.hexDump(bufStart), ByteBufUtil.hexDump(bufEnd));
- //}
synchronized (parent) {
if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
@@ -462,7 +431,10 @@ class AsyncPageReader extends PageReader {
readStatus.setDiskScanTime(timeToRead);
assert (totalValuesRead <= totalValuesCount);
}
- synchronized (queue) {
+ // You do need the synchronized block
+ // because you want the check to see if there is remaining capacity in the queue, to be
+ // synchronized
+ synchronized (parent.pageQueueSyncronize) {
queue.put(readStatus);
// if the queue is not full, schedule another read task immediately. If it is then the consumer
// will schedule a new read task as soon as it removes a page from the queue.
http://git-wip-us.apache.org/repos/asf/drill/blob/a7e29876/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index 98e1d78..2d8f556 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -99,6 +99,17 @@ public abstract class ColumnReader<V extends ValueVector> {
new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(),
columnChunkMetaData);
}
+ try {
+ pageReader.init();
+ } catch (IOException e) {
+ UserException ex = UserException.dataReadError(e)
+ .message("Error initializing page reader for Parquet file")
+ .pushContext("Row Group Start: ", this.columnChunkMetaData.getStartingPos())
+ .pushContext("Column: ", this.schemaElement.getName())
+ .pushContext("File: ", this.parentReader.getHadoopPath().toString() )
+ .build(logger);
+ throw ex;
+ }
if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
dataTypeLengthInBits = columnDescriptor.getTypeLength() * 8;
@@ -106,9 +117,7 @@ public abstract class ColumnReader<V extends ValueVector> {
dataTypeLengthInBits = ParquetColumnMetadata.getTypeLengthInBits(columnDescriptor.getType());
}
}
- if(threadPool == null) {
- threadPool = parentReader.getOperatorContext().getScanDecodeExecutor();
- }
+ threadPool = parentReader.getOperatorContext().getScanDecodeExecutor();
}
public int getRecordsReadInCurrentPass() {
@@ -218,7 +227,7 @@ public abstract class ColumnReader<V extends ValueVector> {
public Future<Boolean> readPageAsync() {
Future<Boolean> f = threadPool.submit(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
- return new Boolean(readPage());
+ return Boolean.valueOf(readPage());
}
});
return f;
http://git-wip-us.apache.org/repos/asf/drill/blob/a7e29876/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 8a783c9..35cdb14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -46,6 +46,7 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.PrimitiveType;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -143,10 +144,6 @@ class PageReader {
columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), enforceTotalSize,
useFadvise);
}
- dataReader.init();
-
- loadDictionaryIfExists(parentStatus, columnChunkMetaData, dataReader);
-
} catch (IOException e) {
throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: "
+ path.getName(), e);
@@ -154,11 +151,33 @@ class PageReader {
}
+ protected void init() throws IOException{
+ dataReader.init();
+ loadDictionaryIfExists(parentColumnReader, parentColumnReader.columnChunkMetaData, dataReader);
+ }
+
protected void loadDictionaryIfExists(final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentStatus,
final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws IOException {
Stopwatch timer = Stopwatch.createUnstarted();
if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
- dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos());
+ long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
+ while (bytesToSkip > 0) {
+ long skipped = dataReader.skip(bytesToSkip);
+ if (skipped > 0) {
+ bytesToSkip -= skipped;
+ } else {
+ // no good way to handle this. Guava uses InputStream.available to check
+ // if EOF is reached and because available is not reliable,
+ // tries to read the rest of the data.
+ DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
+ if (skipBuf != null) {
+ skipBuf.release();
+ } else {
+ throw new EOFException("End of File reachecd.");
+ }
+ }
+ }
+
long start=dataReader.getPos();
timer.start();
final PageHeader pageHeader = Util.readPageHeader(f);
@@ -366,7 +385,7 @@ class PageReader {
if (pageHeader.type == PageType.DICTIONARY_PAGE) {
pageType = "Dictionary Page";
}
- logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}", op, pageType.toString(),
+ logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}", op, pageType,
this.parentColumnReader.parentReader.hadoopPath,
this.parentColumnReader.columnDescriptor.toString(), start, bytesin, bytesout, time);