You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2016/02/15 03:00:40 UTC

[2/2] tajo git commit: TAJO-2073: Upgrade parquet-mr to 1.8.1.

TAJO-2073: Upgrade parquet-mr to 1.8.1.

Closes #958


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ef43dfaa
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ef43dfaa
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ef43dfaa

Branch: refs/heads/master
Commit: ef43dfaa5a69de8d1f3df3c23598d7b92758bacd
Parents: ea1818a
Author: Jinho Kim <jh...@apache.org>
Authored: Mon Feb 15 10:59:46 2016 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Mon Feb 15 10:59:46 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 tajo-storage/tajo-storage-hdfs/pom.xml          |  17 +-
 .../tajo/storage/parquet/ParquetAppender.java   |   6 +-
 .../tajo/storage/parquet/ParquetScanner.java    |  32 +-
 .../tajo/storage/parquet/TajoParquetReader.java |  35 +-
 .../tajo/storage/parquet/TajoParquetWriter.java |   4 +-
 .../tajo/storage/parquet/TajoReadSupport.java   |  12 +-
 .../storage/parquet/TajoRecordConverter.java    |  12 +-
 .../storage/parquet/TajoRecordMaterializer.java |   6 +-
 .../storage/parquet/TajoSchemaConverter.java    |  10 +-
 .../tajo/storage/parquet/TajoWriteSupport.java  |  12 +-
 .../tajo/storage/text/DelimitedTextFile.java    |   2 +-
 .../thirdparty/parquet/CodecFactory.java        | 190 -------
 .../parquet/ColumnChunkPageWriteStore.java      | 206 --------
 .../parquet/InternalParquetRecordReader.java    | 180 ++++---
 .../parquet/InternalParquetRecordWriter.java    | 160 ------
 .../thirdparty/parquet/ParquetFileWriter.java   | 492 -------------------
 .../thirdparty/parquet/ParquetReader.java       | 170 +++++--
 .../thirdparty/parquet/ParquetWriter.java       | 224 ---------
 .../org/apache/tajo/storage/TestStorages.java   |  64 ++-
 .../tajo/storage/parquet/TestReadWrite.java     |   3 +-
 .../storage/parquet/TestSchemaConverter.java    |   4 +-
 22 files changed, 388 insertions(+), 1455 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a932c65..2f31255 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,8 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-2073: Upgrade parquet-mr to 1.8.1. (jinho)
+
     TAJO-2052: Upgrading ORC reader version. (Jongyoung Park via jaehwa)
     
     TAJO-1940: Implement HBaseTablespace::getTableVolume() method. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml
index 6c10a88..50bfa15 100644
--- a/tajo-storage/tajo-storage-hdfs/pom.xml
+++ b/tajo-storage/tajo-storage-hdfs/pom.xml
@@ -34,8 +34,7 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-    <parquet.version>1.5.0</parquet.version>
-    <parquet.format.version>2.1.0</parquet.format.version>
+    <parquet.version>1.8.1</parquet.version>
   </properties>
 
   <repositories>
@@ -337,21 +336,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>parquet-column</artifactId>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-hadoop-bundle</artifactId>
       <version>${parquet.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>parquet-hadoop</artifactId>
-      <version>${parquet.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>parquet-format</artifactId>
-      <version>${parquet.format.version}</version>
-    </dependency>
-    <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-buffer</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index e7b523a..423869d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -29,8 +29,8 @@ import org.apache.tajo.storage.FileAppender;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.storage.TableStatistics;
 import org.apache.tajo.storage.Tuple;
-import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
 import java.io.IOException;
 
@@ -121,7 +121,7 @@ public class ParquetAppender extends FileAppender {
   }
 
   public long getEstimatedOutputSize() throws IOException {
-    return writer.getEstimatedWrittenSize();
+    return writer.getDataSize();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
index d7f753c..0c4749c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
@@ -21,9 +21,10 @@ package org.apache.tajo.storage.parquet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.exception.NotImplementedException;
 import org.apache.tajo.exception.TajoRuntimeException;
-import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.storage.EmptyTuple;
 import org.apache.tajo.storage.FileScanner;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.fragment.Fragment;
@@ -35,6 +36,10 @@ import java.io.IOException;
  */
 public class ParquetScanner extends FileScanner {
   private TajoParquetReader reader;
+  /** The number of actual read records */
+  private long currentRowCount;
+  private long totalRowCount;
+  private boolean closed;
 
   /**
    * Creates a new ParquetScanner.
@@ -58,7 +63,10 @@ public class ParquetScanner extends FileScanner {
     if (targets == null) {
       targets = schema.toArray();
     }
-    reader = new TajoParquetReader(fragment.getPath(), schema, new Schema(targets));
+    reader = new TajoParquetReader(conf, fragment.getPath(), schema, new Schema(targets));
+    totalRowCount = reader.getTotalRowCount();
+    currentRowCount = 0;
+    closed = false;
     super.init();
   }
 
@@ -70,6 +78,16 @@ public class ParquetScanner extends FileScanner {
    */
   @Override
   public Tuple next() throws IOException {
+    // If there is no required column, we just read footer and then return an empty tuple
+    if (targets.length == 0) {
+      if(currentRowCount == totalRowCount) {
+        return null;
+      } else {
+        currentRowCount++;
+        return EmptyTuple.get();
+      }
+    }
+
     return reader.read();
   }
 
@@ -78,6 +96,7 @@ public class ParquetScanner extends FileScanner {
    */
   @Override
   public void reset() throws IOException {
+    throw new TajoRuntimeException(new NotImplementedException());
   }
 
   /**
@@ -88,6 +107,7 @@ public class ParquetScanner extends FileScanner {
     if (reader != null) {
       reader.close();
     }
+    closed = true;
   }
 
   /**
@@ -112,7 +132,7 @@ public class ParquetScanner extends FileScanner {
 
   @Override
   public void setFilter(EvalNode filter) {
-    throw new TajoRuntimeException(new UnsupportedException());
+    throw new TajoRuntimeException(new NotImplementedException());
   }
 
   /**
@@ -131,7 +151,11 @@ public class ParquetScanner extends FileScanner {
     if (!inited) {
       return super.getProgress();
     } else {
-      return reader.getProgress();
+      if (closed) {
+        return 1.0f;
+      } else {
+        return reader.getProgress();
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
index a765f48..8d70c48 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
@@ -18,11 +18,12 @@
 
 package org.apache.tajo.storage.parquet;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.thirdparty.parquet.ParquetReader;
-import parquet.filter.UnboundRecordFilter;
 
 import java.io.IOException;
 
@@ -32,54 +33,34 @@ import java.io.IOException;
  * directly.
  */
 public class TajoParquetReader extends ParquetReader<Tuple> {
-  /**
-   * Creates a new TajoParquetReader.
-   *
-   * @param file The file to read from.
-   * @param readSchema Tajo schema of the table.
-   */
-  public TajoParquetReader(Path file, Schema readSchema) throws IOException {
-    super(file, new TajoReadSupport(readSchema));
-  }
 
   /**
    * Creates a new TajoParquetReader.
    *
+   * @param conf the configuration
    * @param file The file to read from.
    * @param readSchema Tajo schema of the table.
    * @param requestedSchema Tajo schema of the projection.
    */
-  public TajoParquetReader(Path file, Schema readSchema,
+  public TajoParquetReader(Configuration conf, Path file, Schema readSchema,
                            Schema requestedSchema) throws IOException {
-    super(file, new TajoReadSupport(readSchema, requestedSchema));
-  }
-
-  /**
-   * Creates a new TajoParquetReader.
-   *
-   * @param file The file to read from.
-   * @param readSchema Tajo schema of the table.
-   * @param recordFilter Record filter.
-   */
-  public TajoParquetReader(Path file, Schema readSchema,
-                           UnboundRecordFilter recordFilter)
-      throws IOException {
-    super(file, new TajoReadSupport(readSchema), recordFilter);
+    super(conf, file, new TajoReadSupport(readSchema, requestedSchema));
   }
 
   /**
    * Creates a new TajoParquetReader.
    *
+   * @param conf the configuration
    * @param file The file to read from.
    * @param readSchema Tajo schema of the table.
    * @param requestedSchema Tajo schema of the projection.
    * @param recordFilter Record filter.
    */
-  public TajoParquetReader(Path file, Schema readSchema,
+  public TajoParquetReader(Configuration conf, Path file, Schema readSchema,
                            Schema requestedSchema,
                            UnboundRecordFilter recordFilter)
       throws IOException {
-    super(file, new TajoReadSupport(readSchema, requestedSchema),
+    super(conf, file, new TajoReadSupport(readSchema, requestedSchema),
           recordFilter);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
index 5f220c5..2217ed2 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
@@ -21,8 +21,8 @@ package org.apache.tajo.storage.parquet;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter;
-import parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
index a64e987..a3e45e3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
@@ -19,13 +19,13 @@
 package org.apache.tajo.storage.parquet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Log;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.storage.Tuple;
-import parquet.Log;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
 
 import java.util.Map;
 
@@ -69,7 +69,7 @@ public class TajoReadSupport extends ReadSupport<Tuple> {
    * @return A ReadContext that defines how to read the file.
    */
   @Override
-  public ReadContext init(InitContext context) {
+  public ReadSupport.ReadContext init(InitContext context) {
     if (requestedSchema == null) {
       throw new RuntimeException("requestedSchema is null.");
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
index 7d73021..d3bbed1 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -27,12 +27,12 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.*;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
-import parquet.io.api.Binary;
-import parquet.io.api.Converter;
-import parquet.io.api.GroupConverter;
-import parquet.io.api.PrimitiveConverter;
-import parquet.schema.GroupType;
-import parquet.schema.Type;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.Type;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
index 25610fc..f5a84de 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
@@ -21,9 +21,9 @@ package org.apache.tajo.storage.parquet;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.Tuple;
-import parquet.io.api.GroupConverter;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
 
 /**
  * Materializes a Tajo Tuple from a stream of Parquet data.

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
index 3d573f6..dfe6af8 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
@@ -18,14 +18,14 @@
 
 package org.apache.tajo.storage.parquet;
 
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
-import parquet.schema.MessageType;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type;
 
 import java.util.ArrayList;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
index e5ad28c..e5d73f5 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
@@ -24,12 +24,12 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
 import org.apache.tajo.storage.Tuple;
-import parquet.hadoop.api.WriteSupport;
-import parquet.io.api.Binary;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
 
 import java.util.HashMap;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 12ab738..e112b0d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -268,7 +268,7 @@ public class DelimitedTextFile {
 
     private final long endOffset;
     /** The number of actual read records */
-    private int recordCount = 0;
+    private long recordCount = 0;
 
     private DelimitedLineReader reader;
     private TextLineDeserializer deserializer;

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
deleted file mode 100644
index fcaf2f3..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.util.ReflectionUtils;
-import parquet.bytes.BytesInput;
-import parquet.hadoop.BadConfigurationException;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-class CodecFactory {
-
-  public static class BytesDecompressor {
-
-    private final CompressionCodec codec;
-    private final Decompressor decompressor;
-
-    public BytesDecompressor(CompressionCodec codec) {
-      this.codec = codec;
-      if (codec != null) {
-        decompressor = CodecPool.getDecompressor(codec);
-      } else {
-        decompressor = null;
-      }
-    }
-
-    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
-      final BytesInput decompressed;
-      if (codec != null) {
-        decompressor.reset();
-        InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor);
-        decompressed = BytesInput.from(is, uncompressedSize);
-      } else {
-        decompressed = bytes;
-      }
-      return decompressed;
-    }
-
-    private void release() {
-      if (decompressor != null) {
-        CodecPool.returnDecompressor(decompressor);
-      }
-    }
-  }
-
-  /**
-   * Encapsulates the logic around hadoop compression
-   *
-   * @author Julien Le Dem
-   *
-   */
-  public static class BytesCompressor {
-
-    private final CompressionCodec codec;
-    private final Compressor compressor;
-    private final ByteArrayOutputStream compressedOutBuffer;
-    private final CompressionCodecName codecName;
-
-    public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) {
-      this.codecName = codecName;
-      this.codec = codec;
-      if (codec != null) {
-        this.compressor = CodecPool.getCompressor(codec);
-        this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
-      } else {
-        this.compressor = null;
-        this.compressedOutBuffer = null;
-      }
-    }
-
-    public BytesInput compress(BytesInput bytes) throws IOException {
-      final BytesInput compressedBytes;
-      if (codec == null) {
-        compressedBytes = bytes;
-      } else {
-        compressedOutBuffer.reset();
-        if (compressor != null) {
-          // null compressor for non-native gzip
-          compressor.reset();
-        }
-        CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
-        bytes.writeAllTo(cos);
-        cos.finish();
-        cos.close();
-        compressedBytes = BytesInput.from(compressedOutBuffer);
-      }
-      return compressedBytes;
-    }
-
-    private void release() {
-      if (compressor != null) {
-        CodecPool.returnCompressor(compressor);
-      }
-    }
-
-    public CompressionCodecName getCodecName() {
-      return codecName;
-    }
-
-  }
-
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<>();
-  private final Map<String, CompressionCodec> codecByName = new HashMap<>();
-  private final Configuration configuration;
-
-  public CodecFactory(Configuration configuration) {
-    this.configuration = configuration;
-  }
-
-  /**
-   *
-   * @param codecName the requested codec
-   * @return the corresponding hadoop codec. null if UNCOMPRESSED
-   */
-  private CompressionCodec getCodec(CompressionCodecName codecName) {
-    String codecClassName = codecName.getHadoopCompressionCodecClassName();
-    if (codecClassName == null) {
-      return null;
-    }
-    CompressionCodec codec = codecByName.get(codecClassName);
-    if (codec != null) {
-      return codec;
-    }
-
-    try {
-      Class<?> codecClass = Class.forName(codecClassName);
-      codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
-      codecByName.put(codecClassName, codec);
-      return codec;
-    } catch (ClassNotFoundException e) {
-      throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
-    }
-  }
-
-  public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) {
-    BytesCompressor comp = compressors.get(codecName);
-    if (comp == null) {
-      CompressionCodec codec = getCodec(codecName);
-      comp = new BytesCompressor(codecName, codec, pageSize);
-      compressors.put(codecName, comp);
-    }
-    return comp;
-  }
-
-  public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
-    BytesDecompressor decomp = decompressors.get(codecName);
-    if (decomp == null) {
-      CompressionCodec codec = getCodec(codecName);
-      decomp = new BytesDecompressor(codec);
-      decompressors.put(codecName, decomp);
-    }
-    return decomp;
-  }
-
-  public void release() {
-    for (BytesCompressor compressor : compressors.values()) {
-      compressor.release();
-    }
-    compressors.clear();
-    for (BytesDecompressor decompressor : decompressors.values()) {
-      decompressor.release();
-    }
-    decompressors.clear();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
deleted file mode 100644
index 6507b80..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.bytes.CapacityByteArrayOutputStream;
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageWriteStore;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.BooleanStatistics;
-import parquet.column.statistics.Statistics;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.MessageType;
-
-import java.io.IOException;
-import java.util.*;
-
-import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
-import static parquet.Log.DEBUG;
-
-class ColumnChunkPageWriteStore implements PageWriteStore {
-  private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
-
-  private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
-
-  private static final class ColumnChunkPageWriter implements PageWriter {
-
-    private final ColumnDescriptor path;
-    private final BytesCompressor compressor;
-
-    private final CapacityByteArrayOutputStream buf;
-    private DictionaryPage dictionaryPage;
-
-    private long uncompressedLength;
-    private long compressedLength;
-    private long totalValueCount;
-    private int pageCount;
-
-    private Set<Encoding> encodings = new HashSet<>();
-
-    private Statistics totalStatistics;
-
-    private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) {
-      this.path = path;
-      this.compressor = compressor;
-      this.buf = new CapacityByteArrayOutputStream(initialSize);
-      this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType());
-    }
-
-    @Deprecated
-    @Override
-    public void writePage(BytesInput bytes,
-                          int valueCount,
-                          Encoding rlEncoding,
-                          Encoding dlEncoding,
-                          Encoding valuesEncoding) throws IOException {
-      long uncompressedSize = bytes.size();
-      BytesInput compressedBytes = compressor.compress(bytes);
-      long compressedSize = compressedBytes.size();
-      BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object
-      parquetMetadataConverter.writeDataPageHeader(
-          (int)uncompressedSize,
-          (int)compressedSize,
-          valueCount,
-          statistics,
-          rlEncoding,
-          dlEncoding,
-          valuesEncoding,
-          buf);
-      this.uncompressedLength += uncompressedSize;
-      this.compressedLength += compressedSize;
-      this.totalValueCount += valueCount;
-      this.pageCount += 1;
-      compressedBytes.writeAllTo(buf);
-      encodings.add(rlEncoding);
-      encodings.add(dlEncoding);
-      encodings.add(valuesEncoding);
-    }
-
-    @Override
-    public void writePage(BytesInput bytes,
-                          int valueCount,
-                          Statistics statistics,
-                          Encoding rlEncoding,
-                          Encoding dlEncoding,
-                          Encoding valuesEncoding) throws IOException {
-      long uncompressedSize = bytes.size();
-      BytesInput compressedBytes = compressor.compress(bytes);
-      long compressedSize = compressedBytes.size();
-      parquetMetadataConverter.writeDataPageHeader(
-          (int)uncompressedSize,
-          (int)compressedSize,
-          valueCount,
-          statistics,
-          rlEncoding,
-          dlEncoding,
-          valuesEncoding,
-          buf);
-      this.uncompressedLength += uncompressedSize;
-      this.compressedLength += compressedSize;
-      this.totalValueCount += valueCount;
-      this.pageCount += 1;
-      this.totalStatistics.mergeStatistics(statistics);
-      compressedBytes.writeAllTo(buf);
-      encodings.add(rlEncoding);
-      encodings.add(dlEncoding);
-      encodings.add(valuesEncoding);
-    }
-
-    @Override
-    public long getMemSize() {
-      return buf.size();
-    }
-
-    public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
-      writer.startColumn(path, totalValueCount, compressor.getCodecName());
-      if (dictionaryPage != null) {
-        writer.writeDictionaryPage(dictionaryPage);
-        encodings.add(dictionaryPage.getEncoding());
-      }
-      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<>(encodings));
-      writer.endColumn();
-      if (DEBUG) {
-        LOG.debug(
-            String.format(
-                "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
-                buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings)
-                + (dictionaryPage != null ? String.format(
-                ", dic { %,d entries, %,dB raw, %,dB comp}",
-                dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
-                : ""));
-      }
-      encodings.clear();
-      pageCount = 0;
-    }
-
-    @Override
-    public long allocatedSize() {
-      return buf.getCapacity();
-    }
-
-    @Override
-    public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
-      if (this.dictionaryPage != null) {
-        throw new ParquetEncodingException("Only one dictionary page is allowed");
-      }
-      BytesInput dictionaryBytes = dictionaryPage.getBytes();
-      int uncompressedSize = (int)dictionaryBytes.size();
-      BytesInput compressedBytes = compressor.compress(dictionaryBytes);
-      this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
-    }
-
-    @Override
-    public String memUsageString(String prefix) {
-      return buf.memUsageString(prefix + " ColumnChunkPageWriter");
-    }
-  }
-
-  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<>();
-  private final MessageType schema;
-  private final BytesCompressor compressor;
-  private final int initialSize;
-
-  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) {
-    this.compressor = compressor;
-    this.schema = schema;
-    this.initialSize = initialSize;
-  }
-
-  @Override
-  public PageWriter getPageWriter(ColumnDescriptor path) {
-    if (!writers.containsKey(path)) {
-      writers.put(path,  new ColumnChunkPageWriter(path, compressor, initialSize));
-    }
-    return writers.get(path);
-  }
-
-  public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
-    List<ColumnDescriptor> columns = schema.getColumns();
-    for (ColumnDescriptor columnDescriptor : columns) {
-      ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor);
-      pageWriter.writeToFileWriter(writer);
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
index 5beba14..1cc37dd 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
@@ -20,33 +20,44 @@ package org.apache.tajo.storage.thirdparty.parquet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import parquet.Log;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.PageReadStore;
-import parquet.filter.UnboundRecordFilter;
-import parquet.hadoop.ParquetFileReader;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.util.counters.BenchmarkCounter;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.io.ParquetDecodingException;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.UnmaterializableRecordCounter;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import static java.lang.String.format;
-import static parquet.Log.DEBUG;
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.Preconditions.checkNotNull;
+import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
 
+/**
+ * This class is borrowed from parquet-mr-1.8.1, but it is modified in order to progress.
+ */
 class InternalParquetRecordReader<T> {
   private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
 
-  private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
+  private ColumnIOFactory columnIOFactory = null;
+  private final Filter filter;
 
   private MessageType requestedSchema;
   private MessageType fileSchema;
@@ -57,11 +68,11 @@ class InternalParquetRecordReader<T> {
 
   private T currentValue;
   private long total;
-  private int current = 0;
+  private long current = 0;
   private int currentBlock = -1;
   private ParquetFileReader reader;
-  private parquet.io.RecordReader<T> recordReader;
-  private UnboundRecordFilter recordFilter;
+  private org.apache.parquet.io.RecordReader<T> recordReader;
+  private boolean strictTypeChecking;
 
   private long totalTimeSpentReadingBytes;
   private long totalTimeSpentProcessingRecords;
@@ -70,37 +81,50 @@ class InternalParquetRecordReader<T> {
   private long totalCountLoadedSoFar = 0;
 
   private Path file;
+  private UnmaterializableRecordCounter unmaterializableRecordCounter;
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   * @param filter for filtering individual records
+   */
+  public InternalParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
+    this.readSupport = readSupport;
+    this.filter = checkNotNull(filter, "filter");
+  }
 
   /**
    * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
    */
   public InternalParquetRecordReader(ReadSupport<T> readSupport) {
-    this(readSupport, null);
+    this(readSupport, FilterCompat.NOOP);
   }
 
   /**
    * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
    * @param filter Optional filter for only returning matching records.
+   * @deprecated use {@link #InternalParquetRecordReader(ReadSupport, Filter)}
    */
-  public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter
-      filter) {
-    this.readSupport = readSupport;
-    this.recordFilter = filter;
+  @Deprecated
+  public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
+    this(readSupport, FilterCompat.get(filter));
   }
 
   private void checkRead() throws IOException {
     if (current == totalCountLoadedSoFar) {
       if (current != 0) {
-        long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt;
-        totalTimeSpentProcessingRecords += timeAssembling;
-        if (DEBUG) LOG.debug("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: " + ((float) totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float) totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
-        long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
-        long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
-        long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
-        if (DEBUG) LOG.debug("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
+        totalTimeSpentProcessingRecords += (System.currentTimeMillis() - startedAssemblingCurrentBlockAt);
+        if (Log.DEBUG) {
+          LOG.debug("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
+          final long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
+          if (totalTime != 0) {
+            final long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
+            final long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
+            LOG.debug("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
+          }
+        }
       }
 
-      if (DEBUG) LOG.debug("at row " + current + ". reading next block");
+      if (Log.DEBUG) LOG.debug("at row " + current + ". reading next block");
       long t0 = System.currentTimeMillis();
       PageReadStore pages = reader.readNextRowGroup();
       if (pages == null) {
@@ -109,12 +133,10 @@ class InternalParquetRecordReader<T> {
       long timeSpentReading = System.currentTimeMillis() - t0;
       totalTimeSpentReadingBytes += timeSpentReading;
       BenchmarkCounter.incrementTime(timeSpentReading);
-      if (DEBUG) {
-        LOG.debug("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
-        LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
-      }
-      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
-      recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
+      if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
+      if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
+      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
+      recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
       startedAssemblingCurrentBlockAt = System.currentTimeMillis();
       totalCountLoadedSoFar += pages.getRowCount();
       ++ currentBlock;
@@ -122,7 +144,9 @@ class InternalParquetRecordReader<T> {
   }
 
   public void close() throws IOException {
-    reader.close();
+    if (reader != null) {
+      reader.close();
+    }
   }
 
   public Void getCurrentKey() throws IOException, InterruptedException {
@@ -138,24 +162,28 @@ class InternalParquetRecordReader<T> {
     return (float) current / total;
   }
 
-  public void initialize(MessageType requestedSchema, MessageType fileSchema,
-                         Map<String, String> extraMetadata, Map<String, String> readSupportMetadata,
+  public void initialize(FileMetaData parquetFileMetadata,
                          Path file, List<BlockMetaData> blocks, Configuration configuration)
       throws IOException {
-    this.requestedSchema = requestedSchema;
-    this.fileSchema = fileSchema;
+    // initialize a ReadContext for this file
+    Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
+    ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
+        configuration, toSetMultiMap(fileMetadata), fileSchema));
+    this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+    this.requestedSchema = readContext.getRequestedSchema();
+    this.fileSchema = parquetFileMetadata.getSchema();
     this.file = file;
-    this.columnCount = this.requestedSchema.getPaths().size();
+    this.columnCount = requestedSchema.getPaths().size();
     this.recordConverter = readSupport.prepareForRead(
-        configuration, extraMetadata, fileSchema,
-        new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));
-
+        configuration, fileMetadata, fileSchema, readContext);
+    this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
     List<ColumnDescriptor> columns = requestedSchema.getColumns();
-    reader = new ParquetFileReader(configuration, file, blocks, columns);
+    reader = new ParquetFileReader(configuration, parquetFileMetadata, file, blocks, columns);
     for (BlockMetaData block : blocks) {
       total += block.getRowCount();
     }
-    if (DEBUG) LOG.debug("RecordReader initialized will read a total of " + total + " records.");
+    this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
+    LOG.info("RecordReader initialized will read a total of " + total + " records.");
   }
 
   private boolean contains(GroupType group, String[] path, int index) {
@@ -174,17 +202,55 @@ class InternalParquetRecordReader<T> {
   }
 
   public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (current < total) {
+    boolean recordFound = false;
+
+    while (!recordFound) {
+      // no more records left
+      if (current >= total) { return false; }
+
       try {
         checkRead();
-        currentValue = recordReader.read();
-        if (DEBUG) LOG.debug("read value: " + currentValue);
         current ++;
+
+        try {
+          currentValue = recordReader.read();
+        } catch (RecordMaterializationException e) {
+          // this might throw, but it's fatal if it does.
+          unmaterializableRecordCounter.incErrors(e);
+          if (DEBUG) LOG.debug("skipping a corrupt record");
+          continue;
+        }
+
+        if (recordReader.shouldSkipCurrentRecord()) {
+          // this record is being filtered via the filter2 package
+          if (DEBUG) LOG.debug("skipping record");
+          continue;
+        }
+
+        if (currentValue == null) {
+          // only happens with FilteredRecordReader at end of block
+          current = totalCountLoadedSoFar;
+          if (DEBUG) LOG.debug("filtered record reader reached end of block");
+          continue;
+        }
+
+        recordFound = true;
+
+        if (DEBUG) LOG.debug("read value: " + currentValue);
       } catch (RuntimeException e) {
         throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
       }
-      return true;
     }
-    return false;
+    return true;
+  }
+
+  private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
+    Map<K, Set<V>> setMultiMap = new HashMap<K, Set<V>>();
+    for (Map.Entry<K, V> entry : map.entrySet()) {
+      Set<V> set = new HashSet<V>();
+      set.add(entry.getValue());
+      setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set));
+    }
+    return Collections.unmodifiableMap(setMultiMap);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
deleted file mode 100644
index da57745..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import parquet.Log;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreImpl;
-import parquet.hadoop.api.WriteSupport;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.schema.MessageType;
-
-import java.io.IOException;
-import java.util.Map;
-
-import static java.lang.Math.max;
-import static java.lang.Math.min;
-import static java.lang.String.format;
-import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor;
-import static parquet.Log.DEBUG;
-import static parquet.Preconditions.checkNotNull;
-
-class InternalParquetRecordWriter<T> {
-  private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);
-
-  private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
-  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
-  private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
-
-  private final ParquetFileWriter w;
-  private final WriteSupport<T> writeSupport;
-  private final MessageType schema;
-  private final Map<String, String> extraMetaData;
-  private final int blockSize;
-  private final int pageSize;
-  private final BytesCompressor compressor;
-  private final int dictionaryPageSize;
-  private final boolean enableDictionary;
-  private final boolean validating;
-  private final WriterVersion writerVersion;
-
-  private long recordCount = 0;
-  private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
-
-  private ColumnWriteStoreImpl store;
-  private ColumnChunkPageWriteStore pageStore;
-
-  /**
-   * @param w the file to write to
-   * @param writeSupport the class to convert incoming records
-   * @param schema the schema of the records
-   * @param extraMetaData extra meta data to write in the footer of the file
-   * @param blockSize the size of a block in the file (this will be approximate)
-   * @param codec the codec used to compress
-   */
-  public InternalParquetRecordWriter(
-      ParquetFileWriter w,
-      WriteSupport<T> writeSupport,
-      MessageType schema,
-      Map<String, String> extraMetaData,
-      int blockSize,
-      int pageSize,
-      BytesCompressor compressor,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating,
-      WriterVersion writerVersion) {
-    this.w = w;
-    this.writeSupport = checkNotNull(writeSupport, "writeSupport");
-    this.schema = schema;
-    this.extraMetaData = extraMetaData;
-    this.blockSize = blockSize;
-    this.pageSize = pageSize;
-    this.compressor = compressor;
-    this.dictionaryPageSize = dictionaryPageSize;
-    this.enableDictionary = enableDictionary;
-    this.validating = validating;
-    this.writerVersion = writerVersion;
-    initStore();
-  }
-
-  private void initStore() {
-    // we don't want this number to be too small
-    // ideally we divide the block equally across the columns
-    // it is unlikely all columns are going to be the same size.
-    int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5);
-    pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize);
-    // we don't want this number to be too small either
-    // ideally, slightly bigger than the page size, but not bigger than the block buffer
-    int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
-    store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
-    MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
-    writeSupport.prepareForWrite(columnIO.getRecordWriter(store));
-  }
-
-  public void close() throws IOException, InterruptedException {
-    flushStore();
-    w.end(extraMetaData);
-  }
-
-  public void write(T value) throws IOException, InterruptedException {
-    writeSupport.write(value);
-    ++ recordCount;
-    checkBlockSizeReached();
-  }
-
-  private void checkBlockSizeReached() throws IOException {
-    if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
-      long memSize = store.memSize();
-      if (memSize > blockSize) {
-        if (DEBUG) LOG.debug(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount));
-        flushStore();
-        initStore();
-        recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
-      } else {
-        float recordSize = (float) memSize / recordCount;
-        recordCountForNextMemCheck = min(
-            max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway
-            recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
-        );
-        if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
-      }
-    }
-  }
-
-  public long getEstimatedWrittenSize() throws IOException {
-    return w.getPos() + store.memSize();
-  }
-
-  private void flushStore()
-      throws IOException {
-    if (DEBUG) LOG.debug(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));
-    if (store.allocatedSize() > 3 * blockSize) {
-      LOG.warn("Too much memory used: " + store.memUsageString());
-    }
-    w.startBlock(recordCount);
-    store.flush();
-    pageStore.flushToFileWriter(w);
-    recordCount = 0;
-    w.endBlock();
-    store = null;
-    pageStore = null;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
deleted file mode 100644
index 7a32329..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import parquet.Log;
-import parquet.Version;
-import parquet.bytes.BytesInput;
-import parquet.bytes.BytesUtils;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.DictionaryPage;
-import parquet.column.statistics.Statistics;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.metadata.*;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.*;
-import java.util.Map.Entry;
-
-import static parquet.Log.DEBUG;
-import static parquet.format.Util.writeFileMetaData;
-
-/**
- * Internal implementation of the Parquet file writer as a block container
- *
- * @author Julien Le Dem
- *
- */
-public class ParquetFileWriter {
-  private static final Log LOG = Log.getLog(ParquetFileWriter.class);
-
-  public static final String PARQUET_METADATA_FILE = "_metadata";
-  public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
-  public static final int CURRENT_VERSION = 1;
-
-  private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
-
-  private final MessageType schema;
-  private final FSDataOutputStream out;
-  private BlockMetaData currentBlock;
-  private ColumnChunkMetaData currentColumn;
-  private long currentRecordCount;
-  private List<BlockMetaData> blocks = new ArrayList<>();
-  private long uncompressedLength;
-  private long compressedLength;
-  private Set<parquet.column.Encoding> currentEncodings;
-
-  private CompressionCodecName currentChunkCodec;
-  private ColumnPath currentChunkPath;
-  private PrimitiveTypeName currentChunkType;
-  private long currentChunkFirstDataPage;
-  private long currentChunkDictionaryPageOffset;
-  private long currentChunkValueCount;
-
-  private Statistics currentStatistics;
-
-  /**
-   * Captures the order in which methods should be called
-   *
-   * @author Julien Le Dem
-   *
-   */
-  private enum STATE {
-    NOT_STARTED {
-      STATE start() {
-        return STARTED;
-      }
-    },
-    STARTED {
-      STATE startBlock() {
-        return BLOCK;
-      }
-      STATE end() {
-        return ENDED;
-      }
-    },
-    BLOCK  {
-      STATE startColumn() {
-        return COLUMN;
-      }
-      STATE endBlock() {
-        return STARTED;
-      }
-    },
-    COLUMN {
-      STATE endColumn() {
-        return BLOCK;
-      };
-      STATE write() {
-        return this;
-      }
-    },
-    ENDED;
-
-    STATE start() throws IOException { return error(); }
-    STATE startBlock() throws IOException { return error(); }
-    STATE startColumn() throws IOException { return error(); }
-    STATE write() throws IOException { return error(); }
-    STATE endColumn() throws IOException { return error(); }
-    STATE endBlock() throws IOException { return error(); }
-    STATE end() throws IOException { return error(); }
-
-    private final STATE error() throws IOException {
-      throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
-    }
-  }
-
-  private STATE state = STATE.NOT_STARTED;
-
-  /**
-   *
-   * @param configuration Configuration
-   * @param schema the schema of the data
-   * @param file the file to write to
-   * @throws java.io.IOException if the file can not be created
-   */
-  public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException {
-    super();
-    this.schema = schema;
-    FileSystem fs = file.getFileSystem(configuration);
-    this.out = fs.create(file, false);
-  }
-
-  /**
-   * start the file
-   * @throws java.io.IOException
-   */
-  public void start() throws IOException {
-    state = state.start();
-    if (DEBUG) LOG.debug(out.getPos() + ": start");
-    out.write(MAGIC);
-  }
-
-  /**
-   * start a block
-   * @param recordCount the record count in this block
-   * @throws java.io.IOException
-   */
-  public void startBlock(long recordCount) throws IOException {
-    state = state.startBlock();
-    if (DEBUG) LOG.debug(out.getPos() + ": start block");
-//    out.write(MAGIC); // TODO: add a magic delimiter
-    currentBlock = new BlockMetaData();
-    currentRecordCount = recordCount;
-  }
-
-  /**
-   * start a column inside a block
-   * @param descriptor the column descriptor
-   * @param valueCount the value count in this column
-   * @param statistics the statistics in this column
-   * @param compressionCodecName
-   * @throws java.io.IOException
-   */
-  public void startColumn(ColumnDescriptor descriptor,
-                          long valueCount,
-                          CompressionCodecName compressionCodecName) throws IOException {
-    state = state.startColumn();
-    if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
-    currentEncodings = new HashSet<>();
-    currentChunkPath = ColumnPath.get(descriptor.getPath());
-    currentChunkType = descriptor.getType();
-    currentChunkCodec = compressionCodecName;
-    currentChunkValueCount = valueCount;
-    currentChunkFirstDataPage = out.getPos();
-    compressedLength = 0;
-    uncompressedLength = 0;
-    // need to know what type of stats to initialize to
-    // better way to do this?
-    currentStatistics = Statistics.getStatsBasedOnType(currentChunkType);
-  }
-
-  /**
-   * writes a dictionary page page
-   * @param dictionaryPage the dictionary page
-   */
-  public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
-    state = state.write();
-    if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values");
-    currentChunkDictionaryPageOffset = out.getPos();
-    int uncompressedSize = dictionaryPage.getUncompressedSize();
-    int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
-    metadataConverter.writeDictionaryPageHeader(
-        uncompressedSize,
-        compressedPageSize,
-        dictionaryPage.getDictionarySize(),
-        dictionaryPage.getEncoding(),
-        out);
-    long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
-    this.uncompressedLength += uncompressedSize + headerSize;
-    this.compressedLength += compressedPageSize + headerSize;
-    if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize);
-    dictionaryPage.getBytes().writeAllTo(out);
-    currentEncodings.add(dictionaryPage.getEncoding());
-  }
-
-
-  /**
-   * writes a single page
-   * @param valueCount count of values
-   * @param uncompressedPageSize the size of the data once uncompressed
-   * @param bytes the compressed data for the page without header
-   * @param rlEncoding encoding of the repetition level
-   * @param dlEncoding encoding of the definition level
-   * @param valuesEncoding encoding of values
-   */
-  @Deprecated
-  public void writeDataPage(
-      int valueCount, int uncompressedPageSize,
-      BytesInput bytes,
-      parquet.column.Encoding rlEncoding,
-      parquet.column.Encoding dlEncoding,
-      parquet.column.Encoding valuesEncoding) throws IOException {
-    state = state.write();
-    long beforeHeader = out.getPos();
-    if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
-    int compressedPageSize = (int)bytes.size();
-    metadataConverter.writeDataPageHeader(
-        uncompressedPageSize, compressedPageSize,
-        valueCount,
-        rlEncoding,
-        dlEncoding,
-        valuesEncoding,
-        out);
-    long headerSize = out.getPos() - beforeHeader;
-    this.uncompressedLength += uncompressedPageSize + headerSize;
-    this.compressedLength += compressedPageSize + headerSize;
-    if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
-    bytes.writeAllTo(out);
-    currentEncodings.add(rlEncoding);
-    currentEncodings.add(dlEncoding);
-    currentEncodings.add(valuesEncoding);
-  }
-
-  /**
-   * writes a single page
-   * @param valueCount count of values
-   * @param uncompressedPageSize the size of the data once uncompressed
-   * @param bytes the compressed data for the page without header
-   * @param rlEncoding encoding of the repetition level
-   * @param dlEncoding encoding of the definition level
-   * @param valuesEncoding encoding of values
-   */
-  public void writeDataPage(
-      int valueCount, int uncompressedPageSize,
-      BytesInput bytes,
-      Statistics statistics,
-      parquet.column.Encoding rlEncoding,
-      parquet.column.Encoding dlEncoding,
-      parquet.column.Encoding valuesEncoding) throws IOException {
-    state = state.write();
-    long beforeHeader = out.getPos();
-    if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
-    int compressedPageSize = (int)bytes.size();
-    metadataConverter.writeDataPageHeader(
-        uncompressedPageSize, compressedPageSize,
-        valueCount,
-        statistics,
-        rlEncoding,
-        dlEncoding,
-        valuesEncoding,
-        out);
-    long headerSize = out.getPos() - beforeHeader;
-    this.uncompressedLength += uncompressedPageSize + headerSize;
-    this.compressedLength += compressedPageSize + headerSize;
-    if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
-    bytes.writeAllTo(out);
-    currentStatistics.mergeStatistics(statistics);
-    currentEncodings.add(rlEncoding);
-    currentEncodings.add(dlEncoding);
-    currentEncodings.add(valuesEncoding);
-  }
-
-  /**
-   * writes a number of pages at once
-   * @param bytes bytes to be written including page headers
-   * @param uncompressedTotalPageSize total uncompressed size (without page headers)
-   * @param compressedTotalPageSize total compressed size (without page headers)
-   * @throws java.io.IOException
-   */
-  void writeDataPages(BytesInput bytes,
-                      long uncompressedTotalPageSize,
-                      long compressedTotalPageSize,
-                      Statistics totalStats,
-                      List<parquet.column.Encoding> encodings) throws IOException {
-    state = state.write();
-    if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
-    long headersSize = bytes.size() - compressedTotalPageSize;
-    this.uncompressedLength += uncompressedTotalPageSize + headersSize;
-    this.compressedLength += compressedTotalPageSize + headersSize;
-    if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
-    bytes.writeAllTo(out);
-    currentEncodings.addAll(encodings);
-    currentStatistics = totalStats;
-  }
-
-  /**
-   * end a column (once all rep, def and data have been written)
-   * @throws java.io.IOException
-   */
-  public void endColumn() throws IOException {
-    state = state.endColumn();
-    if (DEBUG) LOG.debug(out.getPos() + ": end column");
-    currentBlock.addColumn(ColumnChunkMetaData.get(
-        currentChunkPath,
-        currentChunkType,
-        currentChunkCodec,
-        currentEncodings,
-        currentStatistics,
-        currentChunkFirstDataPage,
-        currentChunkDictionaryPageOffset,
-        currentChunkValueCount,
-        compressedLength,
-        uncompressedLength));
-    if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
-    currentColumn = null;
-    this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
-    this.uncompressedLength = 0;
-    this.compressedLength = 0;
-  }
-
-  /**
-   * ends a block once all column chunks have been written
-   * @throws java.io.IOException
-   */
-  public void endBlock() throws IOException {
-    state = state.endBlock();
-    if (DEBUG) LOG.debug(out.getPos() + ": end block");
-    currentBlock.setRowCount(currentRecordCount);
-    blocks.add(currentBlock);
-    currentBlock = null;
-  }
-
-  /**
-   * ends a file once all blocks have been written.
-   * closes the file.
-   * @param extraMetaData the extra meta data to write in the footer
-   * @throws java.io.IOException
-   */
-  public void end(Map<String, String> extraMetaData) throws IOException {
-    state = state.end();
-    if (DEBUG) LOG.debug(out.getPos() + ": end");
-    ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
-    serializeFooter(footer, out);
-    out.close();
-  }
-
-  private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException {
-    long footerIndex = out.getPos();
-    parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer);
-    writeFileMetaData(parquetMetadata, out);
-    if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
-    BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
-    out.write(MAGIC);
-  }
-
-  /**
-   * writes a _metadata file
-   * @param configuration the configuration to use to get the FileSystem
-   * @param outputPath the directory to write the _metadata file to
-   * @param footers the list of footers to merge
-   * @throws java.io.IOException
-   */
-  public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
-    Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE);
-    FileSystem fs = outputPath.getFileSystem(configuration);
-    outputPath = outputPath.makeQualified(fs);
-    FSDataOutputStream metadata = fs.create(metaDataPath);
-    metadata.write(MAGIC);
-    ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
-    serializeFooter(metadataFooter, metadata);
-    metadata.close();
-  }
-
-  private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
-    String rootPath = root.toString();
-    GlobalMetaData fileMetaData = null;
-    List<BlockMetaData> blocks = new ArrayList<>();
-    for (Footer footer : footers) {
-      String path = footer.getFile().toString();
-      if (!path.startsWith(rootPath)) {
-        throw new ParquetEncodingException(path + " invalid: all the files must be contained in the root " + root);
-      }
-      path = path.substring(rootPath.length());
-      while (path.startsWith("/")) {
-        path = path.substring(1);
-      }
-      fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
-      for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
-        block.setPath(path);
-        blocks.add(block);
-      }
-    }
-    return new ParquetMetadata(fileMetaData.merge(), blocks);
-  }
-
-  /**
-   * @return the current position in the underlying file
-   * @throws java.io.IOException
-   */
-  public long getPos() throws IOException {
-    return out.getPos();
-  }
-
-  /**
-   * Will merge the metadata of all the footers together
-   * @param footers the list files footers to merge
-   * @return the global meta data for all the footers
-   */
-  static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
-    GlobalMetaData fileMetaData = null;
-    for (Footer footer : footers) {
-      ParquetMetadata currentMetadata = footer.getParquetMetadata();
-      fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData);
-    }
-    return fileMetaData;
-  }
-
-  /**
-   * Will return the result of merging toMerge into mergedMetadata
-   * @param toMerge the metadata toMerge
-   * @param mergedMetadata the reference metadata to merge into
-   * @return the result of the merge
-   */
-  static GlobalMetaData mergeInto(
-      FileMetaData toMerge,
-      GlobalMetaData mergedMetadata) {
-    MessageType schema = null;
-    Map<String, Set<String>> newKeyValues = new HashMap<>();
-    Set<String> createdBy = new HashSet<>();
-    if (mergedMetadata != null) {
-      schema = mergedMetadata.getSchema();
-      newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
-      createdBy.addAll(mergedMetadata.getCreatedBy());
-    }
-    if ((schema == null && toMerge.getSchema() != null)
-        || (schema != null && !schema.equals(toMerge.getSchema()))) {
-      schema = mergeInto(toMerge.getSchema(), schema);
-    }
-    for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
-      Set<String> values = newKeyValues.get(entry.getKey());
-      if (values == null) {
-        values = new HashSet<>();
-        newKeyValues.put(entry.getKey(), values);
-      }
-      values.add(entry.getValue());
-    }
-    createdBy.add(toMerge.getCreatedBy());
-    return new GlobalMetaData(
-        schema,
-        newKeyValues,
-        createdBy);
-  }
-
-  /**
-   * will return the result of merging toMerge into mergedSchema
-   * @param toMerge the schema to merge into mergedSchema
-   * @param mergedSchema the schema to append the fields to
-   * @return the resulting schema
-   */
-  static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
-    if (mergedSchema == null) {
-      return toMerge;
-    }
-    return mergedSchema.union(toMerge);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
index c353a81..cb903b0 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
@@ -22,93 +22,112 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import parquet.filter.UnboundRecordFilter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileReader;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.api.ReadSupport.ReadContext;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
-import parquet.schema.MessageType;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.compat.RowGroupFilter;
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.schema.MessageType;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
 
 /**
+ * This class is borrowed from parquet-mr-1.8.1, but it is modified in order to fix the PARQUET-363 and progress.
+ *
  * Read records from a Parquet file.
  */
 public class ParquetReader<T> implements Closeable {
 
-  private ReadSupport<T> readSupport;
-  private UnboundRecordFilter filter;
-  private Configuration conf;
-  private ReadContext readContext;
-  private Iterator<Footer> footersIterator;
+  private final ReadSupport<T> readSupport;
+  private final Configuration conf;
+  private final Iterator<Footer> footersIterator;
+  private final FilterCompat.Filter filter;
+
   private InternalParquetRecordReader<T> reader;
-  private GlobalMetaData globalMetaData;
+  private long totalRowCount;
 
   /**
    * @param file the file to read
    * @param readSupport to materialize records
-   * @throws java.io.IOException
+   * @throws IOException
+   * @deprecated use {@link #builder(ReadSupport, Path)}
    */
+  @Deprecated
   public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
-    this(file, readSupport, null);
+    this(new Configuration(), file, readSupport, FilterCompat.NOOP);
   }
 
   /**
    * @param conf the configuration
    * @param file the file to read
    * @param readSupport to materialize records
-   * @throws java.io.IOException
+   * @throws IOException
+   * @deprecated use {@link #builder(ReadSupport, Path)}
    */
+  @Deprecated
   public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
-    this(conf, file, readSupport, null);
+    this(conf, file, readSupport, FilterCompat.NOOP);
   }
 
   /**
    * @param file the file to read
    * @param readSupport to materialize records
-   * @param filter the filter to use to filter records
-   * @throws java.io.IOException
+   * @param unboundRecordFilter the filter to use to filter records
+   * @throws IOException
+   * @deprecated use {@link #builder(ReadSupport, Path)}
    */
-  public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
-    this(new Configuration(), file, readSupport, filter);
+  @Deprecated
+  public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
+    this(new Configuration(), file, readSupport, FilterCompat.get(unboundRecordFilter));
   }
 
   /**
    * @param conf the configuration
    * @param file the file to read
    * @param readSupport to materialize records
-   * @param filter the filter to use to filter records
-   * @throws java.io.IOException
+   * @param unboundRecordFilter the filter to use to filter records
+   * @throws IOException
+   * @deprecated use {@link #builder(ReadSupport, Path)}
    */
-  public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
+  @Deprecated
+  public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
+    this(conf, file, readSupport, FilterCompat.get(unboundRecordFilter));
+  }
+
+  private ParquetReader(Configuration conf,
+                        Path file,
+                        ReadSupport<T> readSupport,
+                        Filter filter) throws IOException {
     this.readSupport = readSupport;
-    this.filter = filter;
+    this.filter = checkNotNull(filter, "filter");
     this.conf = conf;
 
     FileSystem fs = file.getFileSystem(conf);
-    List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
-    List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
+    List<FileStatus> statuses = Arrays.asList(fs.listStatus(file, HiddenFileFilter.INSTANCE));
+    List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses, false);
     this.footersIterator = footers.iterator();
-    globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
 
-    List<BlockMetaData> blocks = new ArrayList<>();
     for (Footer footer : footers) {
-      blocks.addAll(footer.getParquetMetadata().getBlocks());
+      for(BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
+        totalRowCount += block.getRowCount();
+      }
     }
-
-    MessageType schema = globalMetaData.getSchema();
-    Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
-    readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
   }
 
   /**
    * @return the next record or null if finished
-   * @throws java.io.IOException
+   * @throws IOException
    */
   public T read() throws IOException {
     try {
@@ -130,10 +149,17 @@ public class ParquetReader<T> implements Closeable {
     }
     if (footersIterator.hasNext()) {
       Footer footer = footersIterator.next();
-      reader = new InternalParquetRecordReader<>(readSupport, filter);
-      reader.initialize(
-          readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
-          readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf);
+
+      List<BlockMetaData> blocks = footer.getParquetMetadata().getBlocks();
+
+      MessageType fileSchema = footer.getParquetMetadata().getFileMetaData().getSchema();
+
+      List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(
+          filter, blocks, fileSchema);
+
+      reader = new InternalParquetRecordReader<T>(readSupport, filter);
+      reader.initialize(footer.getParquetMetadata().getFileMetaData(),
+          footer.getFile(), filteredBlocks, conf);
     }
   }
 
@@ -145,10 +171,64 @@ public class ParquetReader<T> implements Closeable {
   }
 
   public float getProgress() {
-    if (!footersIterator.hasNext()) {
-      return 1.0f;
+    if (reader != null) {
+      return reader.getProgress();
     } else {
-      return reader != null ? reader.getProgress() : 0.0f;
+      if (!footersIterator.hasNext()) {
+        return 1.0f;
+      } else {
+        return 0.0f;
+      }
+    }
+  }
+
+  public long getTotalRowCount() {
+    return totalRowCount;
+  }
+
+  public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) {
+    return new Builder<T>(readSupport, path);
+  }
+
+  public static class Builder<T> {
+    private final ReadSupport<T> readSupport;
+    private final Path file;
+    private Filter filter;
+    protected Configuration conf;
+
+    private Builder(ReadSupport<T> readSupport, Path path) {
+      this.readSupport = checkNotNull(readSupport, "readSupport");
+      this.file = checkNotNull(path, "path");
+      this.conf = new Configuration();
+      this.filter = FilterCompat.NOOP;
+    }
+
+    protected Builder(Path path) {
+      this.readSupport = null;
+      this.file = checkNotNull(path, "path");
+      this.conf = new Configuration();
+      this.filter = FilterCompat.NOOP;
+    }
+
+    public Builder<T> withConf(Configuration conf) {
+      this.conf = checkNotNull(conf, "conf");
+      return this;
+    }
+
+    public Builder<T> withFilter(Filter filter) {
+      this.filter = checkNotNull(filter, "filter");
+      return this;
+    }
+
+    protected ReadSupport<T> getReadSupport() {
+      // if readSupport is null, the protected constructor must have been used
+      Preconditions.checkArgument(readSupport != null,
+          "[BUG] Classes that extend Builder should override getReadSupport()");
+      return readSupport;
+    }
+
+    public ParquetReader<T> build() throws IOException {
+      return new ParquetReader<T>(conf, file, getReadSupport(), filter);
     }
   }
 }