You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/03/26 16:59:17 UTC

[2/5] drill git commit: DRILL-2323: Added parquet metadata to logs (+filename to JSON reader)

DRILL-2323: Added parquet metadata to logs (+filename to JSON reader)


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c5ec7806
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c5ec7806
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c5ec7806

Branch: refs/heads/master
Commit: c5ec7806384de9acec61ff2ef133c0416b7a2cdb
Parents: f447a9d
Author: Sudheesh Katkam <sk...@maprtech.com>
Authored: Tue Mar 3 11:32:57 2015 -0800
Committer: Sudheesh Katkam <sk...@maprtech.com>
Committed: Wed Mar 25 18:16:30 2015 -0700

----------------------------------------------------------------------
 .../exec/store/easy/json/JSONRecordReader.java  |  4 +--
 .../store/parquet/ParquetScanBatchCreator.java  |  2 +-
 .../columnreaders/ParquetRecordReader.java      | 24 ++++++++++----
 .../exec/store/parquet2/DrillParquetReader.java | 14 ++++++--
 .../parquet/hadoop/ColumnChunkIncReadStore.java | 35 +++++++++++++++-----
 5 files changed, 59 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 6fbdf4f..cc7cb83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 public class JSONRecordReader extends AbstractRecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
 
   private OutputMutator mutator;
   private VectorContainerWriter writer;
@@ -92,7 +92,7 @@ public class JSONRecordReader extends AbstractRecordReader {
 
   protected void handleAndRaise(String msg, Exception e) {
     StringBuilder sb = new StringBuilder();
-    sb.append(msg).append(" - Parser was at record: ").append(recordCount+1);
+    sb.append(msg).append(" - In ").append(hadoopPath.toUri().getPath()).append(" parser was at record: ").append(recordCount+1);
     if (e instanceof JsonParseException) {
       JsonParseException ex = JsonParseException.class.cast(e);
       sb.append(" column: ").append(ex.getLocation().getColumnNr());

http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index ad1bf32..3ae2b36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -54,7 +54,7 @@ import com.google.common.collect.Lists;
 
 
 public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
 
   private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
   private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";

http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 5b9212c..11d0042 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -60,7 +60,7 @@ import parquet.schema.PrimitiveType;
 import com.google.common.collect.Lists;
 
 public class ParquetRecordReader extends AbstractRecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
 
   // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
   private static final int NUMBER_OF_VECTORS = 1;
@@ -319,13 +319,17 @@ public class ParquetRecordReader extends AbstractRecordReader {
           }
         }
       }
-    } catch (SchemaChangeException e) {
-      throw new ExecutionSetupException(e);
     } catch (Exception e) {
-      throw new ExecutionSetupException(e);
+      handleAndRaise("Failure in setting up reader", e);
     }
   }
 
+  protected void handleAndRaise(String s, Exception e) {
+    String message = "Error in parquet record reader.\nMessage: " + s +
+      "\nParquet Metadata: " + footer;
+    throw new DrillRuntimeException(message, e);
+  }
+
   @Override
   public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
     try {
@@ -424,9 +428,17 @@ public class ParquetRecordReader extends AbstractRecordReader {
 //      logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
       totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass();
       return firstColumnStatus.getRecordsReadInCurrentPass();
-    } catch (IOException e) {
-      throw new DrillRuntimeException(e);
+    } catch (Exception e) {
+      handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() +
+        "\nTotal records read: " + totalRecordsRead +
+        "\nMock records read: " + mockRecordsRead +
+        "\nRecords to read: " + recordsToRead +
+        "\nRow group index: " + rowGroupIndex +
+        "\nRecords in row group: " + footer.getBlocks().get(rowGroupIndex).getRowCount(), e);
     }
+
+    // this is never reached
+    return 0;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 8778ef8..9d85b67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
@@ -75,7 +76,7 @@ import parquet.schema.Types;
 
 public class DrillParquetReader extends AbstractRecordReader {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
 
   // same as the DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH in ParquetRecordReader
 
@@ -270,10 +271,16 @@ public class DrillParquetReader extends AbstractRecordReader {
         recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer);
       }
     } catch (Exception e) {
-      throw new ExecutionSetupException(e);
+      handleAndRaise("Failure in setting up reader", e);
     }
   }
 
+  protected void handleAndRaise(String s, Exception e) {
+    String message = "Error in drill parquet reader (complex).\nMessage: " + s +
+      "\nParquet Metadata: " + footer;
+    throw new DrillRuntimeException(message, e);
+  }
+
   private static Type getType(String[] pathSegments, int depth, MessageType schema) {
     Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, depth + 1));
     if (depth + 1 == pathSegments.length) {
@@ -312,7 +319,8 @@ public class DrillParquetReader extends AbstractRecordReader {
       if (count % fillLevelCheckFrequency == 0) {
         if (getPercentFilled() > fillLevelCheckThreshold) {
           if(!recordMaterializer.ok()){
-            throw new RuntimeException(String.format("The setting for `%s` is too high for your Parquet records. Please set a lower check threshold and retry your query. ", ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD));
+            String message = String.format("The setting for `%s` is too high for your Parquet records. Please set a lower check threshold and retry your query. ", ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD);
+            handleAndRaise(message, new RuntimeException(message));
           }
           break;
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/c5ec7806/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
index f2fe376..1125435 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -92,9 +93,11 @@ public class ColumnChunkIncReadStore implements PageReadStore {
     @Override
     public DictionaryPage readDictionaryPage() {
       if (dictionaryPage == null) {
+        PageHeader pageHeader = new PageHeader();
+        long pos = 0;
         try {
-          long pos = in.getPos();
-          PageHeader pageHeader = Util.readPageHeader(in);
+          pos = in.getPos();
+          pageHeader = Util.readPageHeader(in);
           if (pageHeader.getDictionary_page_header() == null) {
             in.seek(pos);
             return null;
@@ -105,8 +108,16 @@ public class ColumnChunkIncReadStore implements PageReadStore {
                           pageHeader.getDictionary_page_header().getNum_values(),
                           parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
                   );
-        } catch (IOException e) {
-          throw new RuntimeException(e);
+        } catch (Exception e) {
+          throw new DrillRuntimeException("Error reading dictionary page." +
+            "\nFile path: " + path.toUri().getPath() +
+            "\nRow count: " + rowCount +
+            "\nColumn Chunk Metadata: " + metaData +
+            "\nPage Header: " + pageHeader +
+            "\nFile offset: " + fileOffset +
+            "\nSize: " + size +
+            "\nValue read so far: " + valueReadSoFar +
+            "\nPosition: " + pos, e);
         }
       }
       return dictionaryPage;
@@ -119,13 +130,14 @@ public class ColumnChunkIncReadStore implements PageReadStore {
 
     @Override
     public Page readPage() {
+      PageHeader pageHeader = new PageHeader();
       try {
         if (lastPage != null) {
           lastPage.release();
           lastPage = null;
         }
         while(valueReadSoFar < metaData.getValueCount()) {
-          PageHeader pageHeader = Util.readPageHeader(in);
+          pageHeader = Util.readPageHeader(in);
           switch (pageHeader.type) {
             case DICTIONARY_PAGE:
               if (dictionaryPage == null) {
@@ -151,7 +163,7 @@ public class ColumnChunkIncReadStore implements PageReadStore {
                       decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
                       pageHeader.data_page_header.num_values,
                       pageHeader.uncompressed_page_size,
-                      parquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
+                      ParquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
                       parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
                       parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
                       parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
@@ -163,8 +175,15 @@ public class ColumnChunkIncReadStore implements PageReadStore {
         }
         in.close();
         return null;
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+      } catch (Exception e) {
+        throw new DrillRuntimeException("Error reading page." +
+          "\nFile path: " + path.toUri().getPath() +
+          "\nRow count: " + rowCount +
+          "\nColumn Chunk Metadata: " + metaData +
+          "\nPage Header: " + pageHeader +
+          "\nFile offset: " + fileOffset +
+          "\nSize: " + size +
+          "\nValue read so far: " + valueReadSoFar, e);
       }
     }