You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/03/26 16:59:17 UTC
[2/5] drill git commit: DRILL-2323: Added parquet metadata to logs
(+filename to JSON reader)
DRILL-2323: Added parquet metadata to logs (+filename to JSON reader)
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c5ec7806
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c5ec7806
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c5ec7806
Branch: refs/heads/master
Commit: c5ec7806384de9acec61ff2ef133c0416b7a2cdb
Parents: f447a9d
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Tue Mar 3 11:32:57 2015 -0800
Committer: Sudheesh Katkam <sk...@maprtech.com>
Committed: Wed Mar 25 18:16:30 2015 -0700
----------------------------------------------------------------------
.../exec/store/easy/json/JSONRecordReader.java | 4 +--
.../store/parquet/ParquetScanBatchCreator.java | 2 +-
.../columnreaders/ParquetRecordReader.java | 24 ++++++++++----
.../exec/store/parquet2/DrillParquetReader.java | 14 ++++++--
.../parquet/hadoop/ColumnChunkIncReadStore.java | 35 +++++++++++++++-----
5 files changed, 59 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 6fbdf4f..cc7cb83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
public class JSONRecordReader extends AbstractRecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
private OutputMutator mutator;
private VectorContainerWriter writer;
@@ -92,7 +92,7 @@ public class JSONRecordReader extends AbstractRecordReader {
protected void handleAndRaise(String msg, Exception e) {
StringBuilder sb = new StringBuilder();
- sb.append(msg).append(" - Parser was at record: ").append(recordCount+1);
+ sb.append(msg).append(" - In ").append(hadoopPath.toUri().getPath()).append(" parser was at record: ").append(recordCount+1);
if (e instanceof JsonParseException) {
JsonParseException ex = JsonParseException.class.cast(e);
sb.append(" column: ").append(ex.getLocation().getColumnNr());
http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index ad1bf32..3ae2b36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -54,7 +54,7 @@ import com.google.common.collect.Lists;
public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 5b9212c..11d0042 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -60,7 +60,7 @@ import parquet.schema.PrimitiveType;
import com.google.common.collect.Lists;
public class ParquetRecordReader extends AbstractRecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
// this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
private static final int NUMBER_OF_VECTORS = 1;
@@ -319,13 +319,17 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
}
}
- } catch (SchemaChangeException e) {
- throw new ExecutionSetupException(e);
} catch (Exception e) {
- throw new ExecutionSetupException(e);
+ handleAndRaise("Failure in setting up reader", e);
}
}
+ protected void handleAndRaise(String s, Exception e) {
+ String message = "Error in parquet record reader.\nMessage: " + s +
+ "\nParquet Metadata: " + footer;
+ throw new DrillRuntimeException(message, e);
+ }
+
@Override
public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
try {
@@ -424,9 +428,17 @@ public class ParquetRecordReader extends AbstractRecordReader {
// logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass();
return firstColumnStatus.getRecordsReadInCurrentPass();
- } catch (IOException e) {
- throw new DrillRuntimeException(e);
+ } catch (Exception e) {
+ handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() +
+ "\nTotal records read: " + totalRecordsRead +
+ "\nMock records read: " + mockRecordsRead +
+ "\nRecords to read: " + recordsToRead +
+ "\nRow group index: " + rowGroupIndex +
+ "\nRecords in row group: " + footer.getBlocks().get(rowGroupIndex).getRowCount(), e);
}
+
+ // this is never reached
+ return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 8778ef8..9d85b67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Set;
import com.google.common.collect.Sets;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
@@ -75,7 +76,7 @@ import parquet.schema.Types;
public class DrillParquetReader extends AbstractRecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
// same as the DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH in ParquetRecordReader
@@ -270,10 +271,16 @@ public class DrillParquetReader extends AbstractRecordReader {
recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer);
}
} catch (Exception e) {
- throw new ExecutionSetupException(e);
+ handleAndRaise("Failure in setting up reader", e);
}
}
+ protected void handleAndRaise(String s, Exception e) {
+ String message = "Error in drill parquet reader (complex).\nMessage: " + s +
+ "\nParquet Metadata: " + footer;
+ throw new DrillRuntimeException(message, e);
+ }
+
private static Type getType(String[] pathSegments, int depth, MessageType schema) {
Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, depth + 1));
if (depth + 1 == pathSegments.length) {
@@ -312,7 +319,8 @@ public class DrillParquetReader extends AbstractRecordReader {
if (count % fillLevelCheckFrequency == 0) {
if (getPercentFilled() > fillLevelCheckThreshold) {
if(!recordMaterializer.ok()){
- throw new RuntimeException(String.format("The setting for `%s` is too high for your Parquet records. Please set a lower check threshold and retry your query. ", ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD));
+ String message = String.format("The setting for `%s` is too high for your Parquet records. Please set a lower check threshold and retry your query. ", ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD);
+ handleAndRaise(message, new RuntimeException(message));
}
break;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
index f2fe376..1125435 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -92,9 +93,11 @@ public class ColumnChunkIncReadStore implements PageReadStore {
@Override
public DictionaryPage readDictionaryPage() {
if (dictionaryPage == null) {
+ PageHeader pageHeader = new PageHeader();
+ long pos = 0;
try {
- long pos = in.getPos();
- PageHeader pageHeader = Util.readPageHeader(in);
+ pos = in.getPos();
+ pageHeader = Util.readPageHeader(in);
if (pageHeader.getDictionary_page_header() == null) {
in.seek(pos);
return null;
@@ -105,8 +108,16 @@ public class ColumnChunkIncReadStore implements PageReadStore {
pageHeader.getDictionary_page_header().getNum_values(),
parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new DrillRuntimeException("Error reading dictionary page." +
+ "\nFile path: " + path.toUri().getPath() +
+ "\nRow count: " + rowCount +
+ "\nColumn Chunk Metadata: " + metaData +
+ "\nPage Header: " + pageHeader +
+ "\nFile offset: " + fileOffset +
+ "\nSize: " + size +
+ "\nValue read so far: " + valueReadSoFar +
+ "\nPosition: " + pos, e);
}
}
return dictionaryPage;
@@ -119,13 +130,14 @@ public class ColumnChunkIncReadStore implements PageReadStore {
@Override
public Page readPage() {
+ PageHeader pageHeader = new PageHeader();
try {
if (lastPage != null) {
lastPage.release();
lastPage = null;
}
while(valueReadSoFar < metaData.getValueCount()) {
- PageHeader pageHeader = Util.readPageHeader(in);
+ pageHeader = Util.readPageHeader(in);
switch (pageHeader.type) {
case DICTIONARY_PAGE:
if (dictionaryPage == null) {
@@ -151,7 +163,7 @@ public class ColumnChunkIncReadStore implements PageReadStore {
decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
pageHeader.data_page_header.num_values,
pageHeader.uncompressed_page_size,
- parquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
+ ParquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
@@ -163,8 +175,15 @@ public class ColumnChunkIncReadStore implements PageReadStore {
}
in.close();
return null;
- } catch (IOException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new DrillRuntimeException("Error reading page." +
+ "\nFile path: " + path.toUri().getPath() +
+ "\nRow count: " + rowCount +
+ "\nColumn Chunk Metadata: " + metaData +
+ "\nPage Header: " + pageHeader +
+ "\nFile offset: " + fileOffset +
+ "\nSize: " + size +
+ "\nValue read so far: " + valueReadSoFar, e);
}
}