You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/10/01 10:20:40 UTC

git commit: TAJO-1044: Implement nextFetch(RowBlock) of Parquer scanner.

Repository: tajo
Updated Branches:
  refs/heads/block_iteration 95af3cce9 -> fade0a87b


TAJO-1044: Implement nextFetch(RowBlock) of Parquer scanner.

Closes #172


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fade0a87
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fade0a87
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fade0a87

Branch: refs/heads/block_iteration
Commit: fade0a87be6beba1cd24838d61bf000948454e96
Parents: 95af3cc
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Oct 1 01:18:47 2014 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Oct 1 01:18:47 2014 -0700

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../org/apache/tajo/catalog/CatalogUtil.java    |  20 +-
 .../src/main/proto/CatalogProtos.proto          |   5 +-
 .../TestCTASQuery/CtasWithManagedTable.sql      |   2 +-
 .../org/apache/tajo/storage/FileScanner.java    |   2 +-
 .../org/apache/tajo/storage/MergeScanner.java   |   2 +-
 .../org/apache/tajo/storage/NullScanner.java    |   9 +
 .../storage/parquet/TajoRecordConverter.java    |  57 ++-
 .../storage/parquet/TajoSchemaConverter.java    |   6 +-
 .../tajo/storage/parquet/TajoWriteSupport.java  |   5 +-
 .../tuple/offheap/HeapTupleBytesComparator.java |   1 +
 .../hadoop/ParquetRowBlockConverter.java        | 353 +++++++++++++++++++
 .../hadoop/ParquetRowBlockParquetReader.java    | 234 ++++++++++++
 .../parquet/hadoop/ParquetRowBlockScanner.java  | 126 +++++++
 .../parquet/hadoop/ParquetRowDirectReader.java  | 185 ++++++++++
 .../parquet/hadoop/ParquetRowMaterializer.java  |  74 ++++
 .../parquet/hadoop/ParquetRowReadSupport.java   | 107 ++++++
 .../src/main/resources/storage-default.xml      |  18 +-
 .../apache/tajo/storage/TestNextFetches.java    | 299 ++++------------
 .../storage/parquet/TestSchemaConverter.java    |   2 +-
 .../src/test/resources/storage-default.xml      |  18 +-
 21 files changed, 1269 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 40b62de..cab6966 100644
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,8 @@ Tajo Change Log
 
 Block Iteration - branch
 
+    TAJO-1044: Implement nextFetch(RowBlock) of Parquer scanner. (hyunsik)
+
     TAJO-1084: Generated classes should access directly UnSafeTuple and 
     RowWriter. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index a2d9796..96dac62 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -277,25 +277,7 @@ public class CatalogUtil {
   }
 
   public static StoreType getStoreType(final String typeStr) {
-    if (typeStr.equalsIgnoreCase(StoreType.CSV.name())) {
-      return StoreType.CSV;
-    } else if (typeStr.equalsIgnoreCase(StoreType.RAW.name())) {
-      return StoreType.RAW;
-    } else if (typeStr.equalsIgnoreCase(StoreType.ROWFILE.name())) {
-      return StoreType.ROWFILE;
-    } else if (typeStr.equalsIgnoreCase(StoreType.RCFILE.name())) {
-      return StoreType.RCFILE;
-    } else if (typeStr.equalsIgnoreCase(StoreType.TREVNI.name())) {
-      return StoreType.TREVNI;
-    } else if (typeStr.equalsIgnoreCase(StoreType.PARQUET.name())) {
-      return StoreType.PARQUET;
-    } else if (typeStr.equalsIgnoreCase(StoreType.SEQUENCEFILE.name())) {
-      return StoreType.SEQUENCEFILE;
-    } else if (typeStr.equalsIgnoreCase(StoreType.AVRO.name())) {
-      return StoreType.AVRO;
-    } else {
-      return null;
-    }
+    return StoreType.valueOf(typeStr.toUpperCase());
   }
 
   public static TableMeta newTableMeta(StoreType type) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index c45e1ef..2cfc1a8 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -35,8 +35,9 @@ enum StoreType {
   HCFILE = 6;
   TREVNI = 7;
   PARQUET = 8;
-  SEQUENCEFILE = 9;
-  AVRO = 10;
+  BLOCK_PARQUET = 9;
+  SEQUENCEFILE = 10;
+  AVRO = 11;
 }
 
 enum OrderType {

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithManagedTable.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithManagedTable.sql b/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithManagedTable.sql
index 1dd5e90..ce6ebdb 100644
--- a/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithManagedTable.sql
+++ b/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithManagedTable.sql
@@ -1,4 +1,4 @@
-create table "MANAGED_TABLE1" (col1 float, col2 float) using rcfile as
+create table "MANAGED_TABLE1" (col1 float, col2 float) using parquet as
 select
   sum(l_orderkey) as total1,
   avg(l_partkey) as total2

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
index d4357e3..0c6bd48 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -84,7 +84,7 @@ public abstract class FileScanner implements Scanner {
 
   @Override
   public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
-    throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
+    throw new UnimplementedException(getClass().getSimpleName() + "::nextFetch(OffHeapRowBlock) is not implemented");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index 890455a..9fdc483 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -117,7 +117,7 @@ public class MergeScanner implements Scanner {
 
   @Override
   public boolean nextFetch(OffHeapRowBlock rowBlock) {
-    throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
+    throw new UnimplementedException("MergeScanner::nextFetch(OffHeapRowBlock) is not implemented.");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
index 4cec67d..56908ed 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
@@ -19,7 +19,9 @@ package org.apache.tajo.storage; /**
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
 
 import java.io.IOException;
 
@@ -36,6 +38,13 @@ public class NullScanner extends FileScanner {
   }
 
   @Override
+  public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
+    progress = 1.0f;
+
+    return false;
+  }
+
+  @Override
   public void reset() throws IOException {
     progress = 0.0f;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
index 7c3d79d..4036ddd 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -117,12 +117,18 @@ public class TajoRecordConverter extends GroupConverter {
         return new FieldFloat4Converter(parent);
       case FLOAT8:
         return new FieldFloat8Converter(parent);
+      case TIMESTAMP:
+        return new FieldTimestampConverter(parent);
+      case TIME:
+        return new FieldTimeConverter(parent);
+      case DATE:
+        return new FieldDateConverter(parent);
+      case TEXT:
+        return new FieldTextConverter(parent);
       case INET4:
         return new FieldInet4Converter(parent);
       case INET6:
         throw new RuntimeException("No converter for INET6");
-      case TEXT:
-        return new FieldTextConverter(parent);
       case PROTOBUF:
         return new FieldProtobufConverter(parent, dataType);
       case BLOB:
@@ -209,7 +215,7 @@ public class TajoRecordConverter extends GroupConverter {
 
     @Override
     final public void addInt(int value) {
-      parent.add(DatumFactory.createBit((byte)(value & 0xff)));
+      parent.add(DatumFactory.createBit((byte) (value & 0xff)));
     }
   }
 
@@ -235,7 +241,7 @@ public class TajoRecordConverter extends GroupConverter {
 
     @Override
     final public void addInt(int value) {
-      parent.add(DatumFactory.createInt2((short)value));
+      parent.add(DatumFactory.createInt2((short) value));
     }
   }
 
@@ -329,8 +335,8 @@ public class TajoRecordConverter extends GroupConverter {
     }
 
     @Override
-    final public void addBinary(Binary value) {
-      parent.add(DatumFactory.createInet4(value.getBytes()));
+    public void addInt(int value) {
+      parent.add(DatumFactory.createInet4(value));
     }
   }
 
@@ -360,6 +366,45 @@ public class TajoRecordConverter extends GroupConverter {
     }
   }
 
+  static final class FieldTimestampConverter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldTimestampConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    public void addLong(long value) {
+      parent.add(DatumFactory.createTimestamp(value));
+    }
+  }
+
+  static final class FieldTimeConverter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldTimeConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    public void addLong(long value) {
+      parent.add(DatumFactory.createTime(value));
+    }
+  }
+
+  static final class FieldDateConverter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldDateConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    public void addInt(int value) {
+      parent.add(DatumFactory.createDate(value));
+    }
+  }
+
   static final class FieldProtobufConverter extends PrimitiveConverter {
     private final ParentValueContainer parent;
     private final DataType dataType;

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
index 2592231..cfca618 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
@@ -161,9 +161,14 @@ public class TajoSchemaConverter {
       case BIT:
       case INT2:
       case INT4:
+      case DATE:
+      case INET4:
         return primitive(column.getSimpleName(),
                          PrimitiveType.PrimitiveTypeName.INT32);
       case INT8:
+      case TIMESTAMP:
+      case TIME:
+
         return primitive(column.getSimpleName(),
                          PrimitiveType.PrimitiveTypeName.INT64);
       case FLOAT4:
@@ -183,7 +188,6 @@ public class TajoSchemaConverter {
       case BLOB:
         return primitive(column.getSimpleName(),
                          PrimitiveType.PrimitiveTypeName.BINARY);
-      case INET4:
       case INET6:
         return primitive(column.getSimpleName(),
                          PrimitiveType.PrimitiveTypeName.BINARY);

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
index 35165de..c0164d1 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
@@ -121,9 +121,13 @@ public class TajoWriteSupport extends WriteSupport<Tuple> {
       case BIT:
       case INT2:
       case INT4:
+      case INET4:
+      case DATE:
         recordConsumer.addInteger(datum.asInt4());
         break;
       case INT8:
+      case TIMESTAMP:
+      case TIME:
         recordConsumer.addLong(datum.asInt8());
         break;
       case FLOAT4:
@@ -138,7 +142,6 @@ public class TajoWriteSupport extends WriteSupport<Tuple> {
         break;
       case PROTOBUF:
       case BLOB:
-      case INET4:
       case INET6:
         recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray()));
         break;

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java
index 5298286..39ce930 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java
@@ -56,6 +56,7 @@ public class HeapTupleBytesComparator {
     for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
       long lw = UNSAFE.getLong(t1.data, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET + offset1);
       long rw = UNSAFE.getLong(t2.data, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET + offset2);
+
       long diff = lw ^ rw;
 
       if (diff != 0) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockConverter.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockConverter.java
new file mode 100644
index 0000000..5129ea3
--- /dev/null
+++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockConverter.java
@@ -0,0 +1,353 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.hadoop;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.GroupType;
+import parquet.schema.Type;
+
+/**
+ * Converter to convert a Parquet record into a Tajo Tuple.
+ */
+public class ParquetRowBlockConverter extends GroupConverter {
+  private final GroupType parquetSchema;
+  private final Schema tajoReadSchema;
+  private final int[] projectionMap;
+  private final int tupleSize;
+
+  private final Converter[] converters;
+
+  private Object [] currentTuple;
+
+  /**
+   * Creates a new TajoRecordConverter.
+   *
+   * @param parquetSchema The Parquet schema of the projection.
+   * @param tajoReadSchema The Tajo schema of the table.
+   * @param projectionMap An array mapping the projection column to the column
+   *                      index in the table.
+   */
+  public ParquetRowBlockConverter(GroupType parquetSchema, Schema tajoReadSchema, int[] projectionMap) {
+    this.parquetSchema = parquetSchema;
+    this.tajoReadSchema = tajoReadSchema;
+    this.projectionMap = projectionMap;
+    this.tupleSize = tajoReadSchema.size();
+
+    // The projectionMap.length does not match parquetSchema.getFieldCount()
+    // when the projection contains NULL_TYPE columns. We will skip over the
+    // NULL_TYPE columns when we construct the converters and populate the
+    // NULL_TYPE columns with NullDatums in start().
+    int index = 0;
+    this.converters = new Converter[parquetSchema.getFieldCount()];
+    for (int i = 0; i < projectionMap.length; ++i) {
+      final int projectionIndex = projectionMap[i];
+      Column column = tajoReadSchema.getColumn(projectionIndex);
+      if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
+        continue;
+      }
+      Type type = parquetSchema.getType(index);
+      converters[index] = newConverter(column, type, new ParentValueContainer() {
+        @Override
+        void add(Object value) {
+          ParquetRowBlockConverter.this.set(projectionIndex, value);
+        }
+      });
+      ++index;
+    }
+  }
+
+  private void set(int index, Object value) {
+    currentTuple[index] = value;
+  }
+
+  private Converter newConverter(Column column, Type type,
+                                 ParentValueContainer parent) {
+    DataType dataType = column.getDataType();
+    switch (dataType.getType()) {
+      case BOOLEAN:
+        return new FieldBooleanConverter(parent);
+      case CHAR:
+        return new FieldCharConverter(parent);
+      case INT2:
+        return new FieldInt2Converter(parent);
+      case INT4:
+      case INET4:
+      case DATE:
+        return new FieldInt4Converter(parent);
+      case INT8:
+      case TIMESTAMP:
+      case TIME:
+        return new FieldInt8Converter(parent);
+      case FLOAT4:
+        return new FieldFloat4Converter(parent);
+      case FLOAT8:
+        return new FieldFloat8Converter(parent);
+      case INET6:
+        throw new RuntimeException("No converter for INET6");
+      case TEXT:
+        return new FieldTextConverter(parent);
+      case PROTOBUF:
+        return new FieldProtobufConverter(parent, dataType);
+      case BLOB:
+        return new FieldBlobConverter(parent);
+      case NULL_TYPE:
+        throw new RuntimeException("No converter for NULL_TYPE.");
+      default:
+        throw new RuntimeException("Unsupported data type");
+    }
+  }
+
+  /**
+   * Gets the converter for a specific field.
+   *
+   * @param fieldIndex Index of the field in the projection.
+   * @return The converter for the field.
+   */
+  @Override
+  public Converter getConverter(int fieldIndex) {
+    return converters[fieldIndex];
+  }
+
+  /**
+   * Called before processing fields. This method fills any fields that have
+   * NULL values or have type NULL_TYPE with a NullDatum.
+   */
+  @Override
+  public void start() {
+    currentTuple = new Object[tupleSize];
+  }
+
+  /**
+   * Called after all fields have been processed.
+   */
+  @Override
+  public void end() {
+  }
+
+  /**
+   * Returns the current record converted by this converter.
+   *
+   * @return The current record.
+   */
+  public Object [] getCurrentRecord() {
+    return currentTuple;
+  }
+
+  static abstract class ParentValueContainer {
+    /**
+     * Adds the value to the parent.
+     *
+     * @param value The value to add.
+     */
+    abstract void add(Object value);
+  }
+
+  static final class FieldBooleanConverter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldBooleanConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addBoolean(boolean value) {
+      parent.add(value);
+    }
+  }
+
+  static final class FieldCharConverter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldCharConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    public void addBinary(parquet.io.api.Binary value) {
+      parent.add(value);
+    }
+  }
+
+  static final class FieldInt2Converter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldInt2Converter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.add((short)value);
+    }
+  }
+
+  static final class FieldInt4Converter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldInt4Converter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.add(value);
+    }
+  }
+
+  static final class FieldInt8Converter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldInt8Converter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addLong(long value) {
+      parent.add(value);
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.add(value);
+    }
+  }
+
+  static final class FieldFloat4Converter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldFloat4Converter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.add(value);
+    }
+
+    @Override
+    final public void addLong(long value) {
+      parent.add(value);
+    }
+
+    @Override
+    final public void addFloat(float value) {
+      parent.add(value);
+    }
+  }
+
+  static final class FieldFloat8Converter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldFloat8Converter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addInt(int value) {
+      parent.add(value);
+    }
+
+    @Override
+    final public void addLong(long value) {
+      parent.add(value);
+    }
+
+    @Override
+    final public void addFloat(float value) {
+      parent.add(value);
+    }
+
+    @Override
+    final public void addDouble(double value) {
+      parent.add(value);
+    }
+  }
+
+  static final class FieldInet4Converter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldInet4Converter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      parent.add(value);
+    }
+  }
+
+  static final class FieldTextConverter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldTextConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      parent.add(value);
+    }
+  }
+
+  static final class FieldBlobConverter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+
+    public FieldBlobConverter(ParentValueContainer parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      parent.add(value);
+    }
+  }
+
+  static final class FieldProtobufConverter extends PrimitiveConverter {
+    private final ParentValueContainer parent;
+    private final DataType dataType;
+
+    public FieldProtobufConverter(ParentValueContainer parent,
+                                  DataType dataType) {
+      this.parent = parent;
+      this.dataType = dataType;
+    }
+
+    @Override
+    final public void addBinary(Binary value) {
+      try {
+        ProtobufDatumFactory factory =
+            ProtobufDatumFactory.get(dataType.getCode());
+        Message.Builder builder = factory.newBuilder();
+        builder.mergeFrom(value.getBytes());
+        parent.add(factory.createDatum(builder));
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockParquetReader.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockParquetReader.java
new file mode 100644
index 0000000..24920a6
--- /dev/null
+++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockParquetReader.java
@@ -0,0 +1,234 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.RowWriter;
+import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.api.InitContext;
+import parquet.hadoop.api.ReadSupport;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.GlobalMetaData;
+import parquet.io.api.Binary;
+import parquet.schema.MessageType;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+
+public class ParquetRowBlockParquetReader implements Closeable {
+
+  private ReadSupport<Object []> readSupport;
+  private UnboundRecordFilter filter;
+  private Configuration conf;
+  private ReadSupport.ReadContext readContext;
+  private Iterator<Footer> footersIterator;
+  private ParquetRowDirectReader reader;
+  private GlobalMetaData globalMetaData;
+  private final int [] projectedMap;
+
+  public ParquetRowBlockParquetReader(Path file, Schema schema, Schema target) throws IOException {
+    this(file, new ParquetRowReadSupport(schema, target));
+  }
+
+  /**
+   * @param file the file to read
+   * @param readSupport to materialize records
+   * @throws java.io.IOException
+   */
+  public ParquetRowBlockParquetReader(Path file, ParquetRowReadSupport readSupport) throws IOException {
+    this(file, readSupport, null);
+  }
+
+  /**
+   * @param conf the configuration
+   * @param file the file to read
+   * @param readSupport to materialize records
+   * @throws java.io.IOException
+   */
+  public ParquetRowBlockParquetReader(Configuration conf, Path file, ParquetRowReadSupport readSupport) throws IOException {
+    this(conf, file, readSupport, null);
+  }
+
+  /**
+   * @param file the file to read
+   * @param readSupport to materialize records
+   * @param filter the filter to use to filter records
+   * @throws java.io.IOException
+   */
+  private ParquetRowBlockParquetReader(Path file, ParquetRowReadSupport readSupport, UnboundRecordFilter filter)
+      throws IOException {
+    this(new Configuration(), file, readSupport, filter);
+  }
+
+  /**
+   * @param conf the configuration
+   * @param file the file to read
+   * @param readSupport to materialize records
+   * @param filter the filter to use to filter records
+   * @throws java.io.IOException
+   */
+  public ParquetRowBlockParquetReader(Configuration conf, Path file, ParquetRowReadSupport readSupport,
+                                      UnboundRecordFilter filter) throws IOException {
+
+    this.readSupport = readSupport;
+    this.filter = filter;
+    this.conf = conf;
+
+    FileSystem fs = file.getFileSystem(conf);
+    List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
+    List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
+    this.footersIterator = footers.iterator();
+    globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
+
+    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+    for (Footer footer : footers) {
+      blocks.addAll(footer.getParquetMetadata().getBlocks());
+    }
+
+    MessageType schema = globalMetaData.getSchema();
+    Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
+    readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
+
+    this.tajoSchema = readSupport.getSchema();
+    this.columnNum = tajoSchema.size();
+    tajoTypes = new TajoDataTypes.Type[columnNum];
+    for (int i = 0; i < columnNum; i++) {
+      tajoTypes[i] = tajoSchema.getColumn(i).getDataType().getType();
+    }
+
+    projectedMap = new int[readSupport.getTargetSchema().size()];
+    for (int i = 0; i < readSupport.getTargetSchema().size(); i++) {
+      projectedMap[i] = tajoSchema.getColumnId(readSupport.getTargetSchema().getColumn(i).getQualifiedName());
+    }
+  }
+
+  Schema tajoSchema;
+  int columnNum;
+  TajoDataTypes.Type [] tajoTypes;
+
+  /**
+   * @return the next record or null if finished
+   * @throws java.io.IOException
+   */
+  public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
+    rowBlock.clear();
+
+    try {
+      if (reader != null) {
+        RowWriter writer = rowBlock.getWriter();
+
+        while(rowBlock.rows() < rowBlock.maxRowNum() && reader.nextKeyValue()) {
+
+          writer.startRow();
+          int prevId = -1;
+          Object [] values = reader.getCurrentValue();
+          for (int columnIdx = 0; columnIdx < projectedMap.length; columnIdx++) {
+            int actualId = projectedMap[columnIdx];
+
+            if (actualId - prevId > 1) {
+              writer.skipField((actualId - prevId) - 1);
+            }
+
+            if (values[actualId] != null) {
+              switch (tajoTypes[actualId]) {
+              case BOOLEAN:
+                writer.putBool((Boolean) values[actualId]);
+                break;
+              case CHAR:
+                writer.putText(((Binary) values[actualId]).getBytes());
+                break;
+              case INT1:
+              case INT2:
+                writer.putInt2((Short) values[actualId]);
+                break;
+              case INT4:
+              case INET4:
+              case DATE:
+                 writer.putInt4((Integer) values[actualId]);
+                break;
+              case INT8:
+              case TIMESTAMP:
+              case TIME:
+                writer.putInt8((Long) values[actualId]);
+                break;
+              case FLOAT4:
+                writer.putFloat4((Float) values[actualId]);
+                break;
+              case FLOAT8:
+                writer.putFloat8((Double) values[actualId]);
+                break;
+              case TEXT:
+                writer.putText(((Binary) values[actualId]).getBytes());
+                break;
+              case BLOB:
+                writer.putBlob(((Binary) values[actualId]).getBytes());
+                break;
+
+              default:
+                throw new IOException("Not supported type: " + tajoTypes[actualId].name());
+              }
+            } else {
+              writer.skipField();
+            }
+
+            prevId = actualId;
+          }
+
+          writer.endRow();
+        }
+
+        return rowBlock.rows() > 0;
+      } else {
+        initReader();
+        return reader == null ? null : nextFetch(rowBlock);
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void initReader() throws IOException {
+    if (reader != null) {
+      reader.close();
+      reader = null;
+    }
+    if (footersIterator.hasNext()) {
+      Footer footer = footersIterator.next();
+      reader = new ParquetRowDirectReader(readSupport, filter);
+      reader.initialize(
+          readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData()
+              .getKeyValueMetaData(),
+          readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (reader != null) {
+      reader.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockScanner.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockScanner.java
new file mode 100644
index 0000000..b771cef
--- /dev/null
+++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockScanner.java
@@ -0,0 +1,126 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+
+import java.io.IOException;
+
+/**
+ * FileScanner for reading Parquet files
+ */
+public class ParquetRowBlockScanner extends FileScanner {
+  private ParquetRowBlockParquetReader reader;
+
+  /**
+   * Creates a new ParquetScanner.
+   *
+   * @param conf
+   * @param schema
+   * @param meta
+   * @param fragment
+   */
+  public ParquetRowBlockScanner(Configuration conf, final Schema schema,
+                                final TableMeta meta, final FileFragment fragment) {
+    super(conf, schema, meta, fragment);
+  }
+
+  /**
+   * Initializes the ParquetScanner. This method initializes the
+   * TajoParquetReader.
+   */
+  @Override
+  public void init() throws IOException {
+    if (targets == null) {
+      targets = schema.toArray();
+    }
+    reader = new ParquetRowBlockParquetReader(fragment.getPath(), schema, new Schema(targets));
+    super.init();
+  }
+
+  /**
+   * Reads the next Tuple from the Parquet file.
+   *
+   * @return The next Tuple from the Parquet file or null if end of file is
+   *         reached.
+   */
+  @Override
+  public Tuple next() throws IOException {
+    throw new UnimplementedException("next() is not implemented.");
+  }
+
+  @Override
+  public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
+    return reader.nextFetch(rowBlock);
+  }
+
+  /**
+   * Resets the scanner
+   */
+  @Override
+  public void reset() throws IOException {
+  }
+
+  /**
+   * Closes the scanner.
+   */
+  @Override
+  public void close() throws IOException {
+    if (reader != null) {
+      reader.close();
+    }
+  }
+
+  /**
+   * Returns whether this scanner is projectable.
+   *
+   * @return true
+   */
+  @Override
+  public boolean isProjectable() {
+    return true;
+  }
+
+  /**
+   * Returns whether this scanner is selectable.
+   *
+   * @return false
+   */
+  @Override
+  public boolean isSelectable() {
+    return false;
+  }
+
+  /**
+   * Returns whether this scanner is splittable.
+   *
+   * @return false
+   */
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/parquet/hadoop/ParquetRowDirectReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowDirectReader.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowDirectReader.java
new file mode 100644
index 0000000..0fda072
--- /dev/null
+++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowDirectReader.java
@@ -0,0 +1,185 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.Log;
+import parquet.column.ColumnDescriptor;
+import parquet.column.page.PageReadStore;
+import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.api.ReadSupport;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.util.counters.BenchmarkCounter;
+import parquet.io.ColumnIOFactory;
+import parquet.io.MessageColumnIO;
+import parquet.io.ParquetDecodingException;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static parquet.Log.DEBUG;
+
+public class ParquetRowDirectReader {
+  private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
+
+  private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
+
+  private MessageType requestedSchema;
+  private MessageType fileSchema;
+  private int columnCount;
+  private final ReadSupport<Object []> readSupport;
+
+  private RecordMaterializer<Object []> recordConverter;
+
+  private Object [] currentValue;
+  private long total;
+  private int current = 0;
+  private int currentBlock = -1;
+  private ParquetFileReader reader;
+  private parquet.io.RecordReader<Object []> recordReader;
+  private UnboundRecordFilter recordFilter;
+
+  private long totalTimeSpentReadingBytes;
+  private long totalTimeSpentProcessingRecords;
+  private long startedAssemblingCurrentBlockAt;
+
+  private long totalCountLoadedSoFar = 0;
+
+  private Path file;
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   */
+  public ParquetRowDirectReader(ReadSupport<Object[]> readSupport) {
+    this(readSupport, null);
+  }
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   * @param filter Optional filter for only returning matching records.
+   */
+  public ParquetRowDirectReader(ReadSupport<Object[]> readSupport, UnboundRecordFilter filter) {
+    this.readSupport = readSupport;
+    this.recordFilter = filter;
+  }
+
+  private void checkRead() throws IOException {
+    if (current == totalCountLoadedSoFar) {
+      if (current != 0) {
+        long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt;
+        totalTimeSpentProcessingRecords += timeAssembling;
+        LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in "
+            + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords)
+            + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
+        long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
+        long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
+        long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
+        LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " +
+            percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
+      }
+
+      LOG.info("at row " + current + ". reading next block");
+      long t0 = System.currentTimeMillis();
+      PageReadStore pages = reader.readNextRowGroup();
+      if (pages == null) {
+        throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
+      }
+      long timeSpentReading = System.currentTimeMillis() - t0;
+      totalTimeSpentReadingBytes += timeSpentReading;
+      BenchmarkCounter.incrementTime(timeSpentReading);
+      LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
+      if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
+      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
+      recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
+      startedAssemblingCurrentBlockAt = System.currentTimeMillis();
+      totalCountLoadedSoFar += pages.getRowCount();
+      ++ currentBlock;
+    }
+  }
+
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  public Object [] getCurrentValue() throws IOException, InterruptedException {
+    return currentValue;
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return (float) current / total;
+  }
+
+  public void initialize(MessageType requestedSchema, MessageType fileSchema,
+                         Map<String, String> extraMetadata, Map<String, String> readSupportMetadata,
+                         Path file, List<BlockMetaData> blocks, Configuration configuration)
+      throws IOException {
+    this.requestedSchema = requestedSchema;
+    this.fileSchema = fileSchema;
+    this.file = file;
+    this.columnCount = this.requestedSchema.getPaths().size();
+    this.recordConverter = readSupport.prepareForRead(
+        configuration, extraMetadata, fileSchema,
+        new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));
+
+    List<ColumnDescriptor> columns = requestedSchema.getColumns();
+    reader = new ParquetFileReader(configuration, file, blocks, columns);
+    for (BlockMetaData block : blocks) {
+      total += block.getRowCount();
+    }
+    LOG.info("RecordReader initialized will read a total of " + total + " records.");
+  }
+
+  private boolean contains(GroupType group, String[] path, int index) {
+    if (index == path.length) {
+      return false;
+    }
+    if (group.containsField(path[index])) {
+      Type type = group.getType(path[index]);
+      if (type.isPrimitive()) {
+        return index + 1 == path.length;
+      } else {
+        return contains(type.asGroupType(), path, index + 1);
+      }
+    }
+    return false;
+  }
+
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (current < total) {
+      try {
+        checkRead();
+        currentValue = recordReader.read();
+        if (DEBUG) LOG.debug("read value: " + currentValue);
+        current ++;
+      } catch (RuntimeException e) {
+        throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current,
+            currentBlock, file), e);
+      }
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/parquet/hadoop/ParquetRowMaterializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowMaterializer.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowMaterializer.java
new file mode 100644
index 0000000..212e805
--- /dev/null
+++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowMaterializer.java
@@ -0,0 +1,74 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.hadoop;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+/**
+ * Materializes a Tajo Tuple from a stream of Parquet data.
+ */
+class ParquetRowMaterializer extends RecordMaterializer<Object []> {
+  private final ParquetRowBlockConverter root;
+
+  /**
+   * Creates a new TajoRecordMaterializer.
+   *
+   * @param parquetSchema The Parquet schema of the projection.
+   * @param tajoSchema The Tajo schema of the projection.
+   * @param tajoReadSchema The Tajo schema of the table.
+   */
+  public ParquetRowMaterializer(MessageType parquetSchema, Schema tajoSchema, Schema tajoReadSchema) {
+    int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema);
+    this.root = new ParquetRowBlockConverter(parquetSchema, tajoReadSchema, projectionMap);
+  }
+
+  private int[] getProjectionMap(Schema schema, Schema projection) {
+    Column[] targets = projection.toArray();
+    int[] projectionMap = new int[targets.length];
+    for (int i = 0; i < targets.length; ++i) {
+      int tid = schema.getColumnId(targets[i].getQualifiedName());
+      projectionMap[i] = tid;
+    }
+    return projectionMap;
+  }
+
+  /**
+   * Returns the current record being materialized.
+   *
+   * @return The record being materialized.
+   */
+  @Override
+  public Object [] getCurrentRecord() {
+    return root.getCurrentRecord();
+  }
+
+  /**
+   * Returns the root converter.
+   *
+   * @return The root converter
+   */
+  @Override
+  public GroupConverter getRootConverter() {
+    return root;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/parquet/hadoop/ParquetRowReadSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowReadSupport.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowReadSupport.java
new file mode 100644
index 0000000..0e09d3f
--- /dev/null
+++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowReadSupport.java
@@ -0,0 +1,107 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.parquet.TajoSchemaConverter;
+import parquet.Log;
+import parquet.hadoop.api.InitContext;
+import parquet.hadoop.api.ReadSupport;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+import java.util.Map;
+
+/**
+ * Tajo implementation of {@link parquet.hadoop.api.ReadSupport} for {@link org.apache.tajo.storage.Tuple}s.
+ * Users should use {@link org.apache.tajo.storage.parquet.ParquetScanner} and not this class directly.
+ */
+public class ParquetRowReadSupport extends ReadSupport<Object []> {
+  private static final Log LOG = Log.getLog(ParquetRowReadSupport.class);
+
+  private Schema readSchema;
+  private Schema requestedSchema;
+
+  /**
+   * Creates a new TajoReadSupport.
+   *
+   * @param requestedSchema The Tajo schema of the requested projection passed
+   *        down by ParquetScanner.
+   */
+  public ParquetRowReadSupport(Schema readSchema, Schema requestedSchema) {
+    super();
+    this.readSchema = readSchema;
+    this.requestedSchema = requestedSchema;
+  }
+
+  /**
+   * Creates a new TajoReadSupport.
+   *
+   * @param readSchema The schema of the table.
+   */
+  public ParquetRowReadSupport(Schema readSchema) {
+    super();
+    this.readSchema = readSchema;
+    this.requestedSchema = readSchema;
+  }
+
+  /**
+   * Initializes the ReadSupport.
+   *
+   * @param context The InitContext.
+   * @return A ReadContext that defines how to read the file.
+   */
+  @Override
+  public ReadContext init(InitContext context) {
+    if (requestedSchema == null) {
+      throw new RuntimeException("requestedSchema is null.");
+    }
+    MessageType requestedParquetSchema =
+      new TajoSchemaConverter().convert(requestedSchema);
+    LOG.debug("Reading data with projection:\n" + requestedParquetSchema);
+    return new ReadContext(requestedParquetSchema);
+  }
+
+  /**
+   * Prepares for read.
+   *
+   * @param configuration The job configuration.
+   * @param keyValueMetaData App-specific metadata from the file.
+   * @param fileSchema The schema of the Parquet file.
+   * @param readContext Returned by the init method.
+   */
+  @Override
+  public RecordMaterializer<Object []> prepareForRead(
+      Configuration configuration,
+      Map<String, String> keyValueMetaData,
+      MessageType fileSchema,
+      ReadContext readContext) {
+    MessageType parquetRequestedSchema = readContext.getRequestedSchema();
+    return new ParquetRowMaterializer(parquetRequestedSchema, requestedSchema, readSchema);
+  }
+
+  public Schema getSchema() {
+    return readSchema;
+  }
+
+  public Schema getTargetSchema() {
+    return requestedSchema;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml
index b227e9d..b0e952e 100644
--- a/tajo-storage/src/main/resources/storage-default.xml
+++ b/tajo-storage/src/main/resources/storage-default.xml
@@ -40,7 +40,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,directraw</value>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,directraw,block_parquet</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -73,6 +73,10 @@
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
   <property>
+    <name>tajo.storage.fragment.block_parquet.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
     <name>tajo.storage.fragment.sequencefile.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
@@ -143,6 +147,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.block_parquet.class</name>
+    <value>parquet.hadoop.ParquetRowBlockScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.v2.parquet.class</name>
     <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
   </property>
@@ -170,7 +179,7 @@
   <!--- Appender Handler -->
   <property>
     <name>tajo.storage.appender-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,block_parquet</value>
   </property>
 
   <property>
@@ -209,6 +218,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.appender-handler.block_parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+  </property>
+
+  <property>
     <name>tajo.storage.appender-handler.sequencefile.class</name>
     <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
   </property>

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
index e81964b..11d0eb4 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
@@ -97,11 +97,14 @@ public class TestNextFetches {
       "  ]\n" +
       "}\n";
 
+  private Schema schema;
+
   private StoreType storeType;
   private boolean splitable;
   private boolean statsable;
   private Path testDir;
   private FileSystem fs;
+  private Tuple allTypedTuple;
 
   public TestNextFetches(StoreType type, boolean splitable, boolean statsable) throws IOException {
     this.storeType = type;
@@ -116,6 +119,43 @@ public class TestNextFetches {
 
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
+
+    schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+    schema.addColumn("col11", Type.NULL_TYPE);
+    if (storeType == StoreType.RAW) {
+      schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+    }
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+    int columnNum = 11 + (storeType == StoreType.RAW ? 1 : 0);
+    allTypedTuple = new VTuple(columnNum);
+    allTypedTuple.put(new Datum[]{
+        DatumFactory.createBool(true),
+        DatumFactory.createChar("jinho"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("jinho"),
+        DatumFactory.createBlob("jinho babo".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+    });
+    if (storeType == StoreType.RAW) {
+      allTypedTuple.put(11, factory.createDatum(queryid.getProto()));
+    }
   }
 
   @Parameterized.Parameters
@@ -125,7 +165,7 @@ public class TestNextFetches {
         // TODO - to be implemented
 //        {StoreType.RAW, false, false},
 //        {StoreType.RCFILE, true, true},
-//        {StoreType.PARQUET, false, false},
+        {StoreType.BLOCK_PARQUET, false, false},
 //        {StoreType.SEQUENCEFILE, true, true},
 //        {StoreType.AVRO, false, false},
     });
@@ -301,12 +341,13 @@ public class TestNextFetches {
             || storeType == StoreType.TREVNI
             || storeType == StoreType.CSV
             || storeType == StoreType.PARQUET
+            || storeType == StoreType.BLOCK_PARQUET
             || storeType == StoreType.SEQUENCEFILE
             || storeType == StoreType.AVRO) {
           assertTrue(tuple.isNull(0));
         }
-        assertTrue(tupleCnt + 2 == tuple.getInt8(1));
-        assertTrue(tupleCnt + 3 == tuple.getFloat4(2));
+        assertTrue(tuple.toString(), tupleCnt + 2 == tuple.getInt8(1));
+        assertTrue(tuple.toString(), tupleCnt + 3 == tuple.getFloat4(2));
         tupleCnt++;
       }
     }
@@ -319,20 +360,6 @@ public class TestNextFetches {
 
   @Test
   public void testVariousTypes() throws IOException {
-    Schema schema = new Schema();
-    schema.addColumn("col1", Type.BOOLEAN);
-    schema.addColumn("col2", Type.CHAR, 7);
-    schema.addColumn("col3", Type.INT2);
-    schema.addColumn("col4", Type.INT4);
-    schema.addColumn("col5", Type.INT8);
-    schema.addColumn("col6", Type.FLOAT4);
-    schema.addColumn("col7", Type.FLOAT8);
-    schema.addColumn("col8", Type.TEXT);
-    schema.addColumn("col9", Type.BLOB);
-    schema.addColumn("col10", Type.INET4);
-    schema.addColumn("col11", Type.NULL_TYPE);
-    schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
     KeyValueSet options = new KeyValueSet();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
@@ -344,26 +371,7 @@ public class TestNextFetches {
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
-
-    QueryId queryid = new QueryId("12345", 5);
-    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
-    Tuple tuple = new VTuple(12);
-    tuple.put(new Datum[] {
-        DatumFactory.createBool(true),
-        DatumFactory.createChar("hyunsik"),
-        DatumFactory.createInt2((short) 17),
-        DatumFactory.createInt4(59),
-        DatumFactory.createInt8(23l),
-        DatumFactory.createFloat4(77.9f),
-        DatumFactory.createFloat8(271.9f),
-        DatumFactory.createText("hyunsik"),
-        DatumFactory.createBlob("hyunsik".getBytes()),
-        DatumFactory.createInet4("192.168.0.1"),
-        NullDatum.get(),
-        factory.createDatum(queryid.getProto())
-    });
-    appender.addTuple(tuple);
+    appender.addTuple(allTypedTuple);
     appender.flush();
     appender.close();
 
@@ -379,8 +387,14 @@ public class TestNextFetches {
     while (scanner.nextFetch(rowBlock)) {
       RowBlockReader reader = rowBlock.getReader();
       while (reader.next(zcTuple)) {
-        for (int i = 0; i < tuple.size(); i++) {
-          assertEquals(tuple.get(i), zcTuple.get(i));
+        for (int i = 0; i < allTypedTuple.size(); i++) {
+          if (schema.getColumn(i).getDataType().getType() == Type.CHAR) {
+            assertEquals(i + "th column is different.",
+                allTypedTuple.get(i).asChars().trim(), zcTuple.get(i).asChars().trim());
+          } else {
+            assertEquals(i + "th column is different.", allTypedTuple.get(i), zcTuple.get(i));
+          }
+
         }
       }
     }
@@ -391,20 +405,6 @@ public class TestNextFetches {
 
   @Test
   public void testNullHandlingTypes() throws IOException {
-    Schema schema = new Schema();
-    schema.addColumn("col1", Type.BOOLEAN);
-    schema.addColumn("col2", Type.CHAR, 7);
-    schema.addColumn("col3", Type.INT2);
-    schema.addColumn("col4", Type.INT4);
-    schema.addColumn("col5", Type.INT8);
-    schema.addColumn("col6", Type.FLOAT4);
-    schema.addColumn("col7", Type.FLOAT8);
-    schema.addColumn("col8", Type.TEXT);
-    schema.addColumn("col9", Type.BLOB);
-    schema.addColumn("col10", Type.INET4);
-    schema.addColumn("col11", Type.NULL_TYPE);
-    schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
     KeyValueSet options = new KeyValueSet();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
@@ -421,34 +421,16 @@ public class TestNextFetches {
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
 
-    QueryId queryid = new QueryId("12345", 5);
-    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
-    Tuple seedTuple = new VTuple(12);
-    seedTuple.put(new Datum[]{
-        DatumFactory.createBool(true),                // 0
-        DatumFactory.createChar("hyunsik"),           // 1
-        DatumFactory.createInt2((short) 17),          // 2
-        DatumFactory.createInt4(59),                  // 3
-        DatumFactory.createInt8(23l),                 // 4
-        DatumFactory.createFloat4(77.9f),             // 5
-        DatumFactory.createFloat8(271.9f),            // 6
-        DatumFactory.createText("hyunsik"),           // 7
-        DatumFactory.createBlob("hyunsik".getBytes()),// 8
-        DatumFactory.createInet4("192.168.0.1"),      // 9
-        NullDatum.get(),                              // 10
-        factory.createDatum(queryid.getProto())       // 11
-    });
-
+    int columnNum = allTypedTuple.size();
     // Making tuples with different null column positions
     Tuple tuple;
-    for (int i = 0; i < 12; i++) {
-      tuple = new VTuple(12);
-      for (int j = 0; j < 12; j++) {
+    for (int i = 0; i < columnNum; i++) {
+      tuple = new VTuple(columnNum);
+      for (int j = 0; j < columnNum; j++) {
         if (i == j) { // i'th column will have NULL value
           tuple.put(j, NullDatum.get());
         } else {
-          tuple.put(j, seedTuple.get(j));
+          tuple.put(j, allTypedTuple.get(j));
         }
       }
       appender.addTuple(tuple);
@@ -471,12 +453,16 @@ public class TestNextFetches {
       RowBlockReader reader = rowBlock.getReader();
 
       while(reader.next(retrieved)) {
-        assertEquals(12, retrieved.size());
-        for (int j = 0; j < 12; j++) {
+        assertEquals(columnNum, retrieved.size());
+        for (int j = 0; j < columnNum; j++) {
           if (i == j) {
             assertEquals(NullDatum.get(), retrieved.get(j));
           } else {
-            assertEquals(seedTuple.get(j), retrieved.get(j));
+            if (schema.getColumn(j).getDataType().getType() == Type.CHAR) {
+              assertEquals(allTypedTuple.get(j).asChars().trim(), retrieved.get(j).asChars().trim());
+            } else {
+              assertEquals(allTypedTuple.get(j), retrieved.get(j));
+            }
           }
         }
 
@@ -492,20 +478,6 @@ public class TestNextFetches {
   public void testRCFileTextSerializeDeserialize() throws IOException {
     if(storeType != StoreType.RCFILE) return;
 
-    Schema schema = new Schema();
-    schema.addColumn("col1", Type.BOOLEAN);
-    schema.addColumn("col2", Type.CHAR, 7);
-    schema.addColumn("col3", Type.INT2);
-    schema.addColumn("col4", Type.INT4);
-    schema.addColumn("col5", Type.INT8);
-    schema.addColumn("col6", Type.FLOAT4);
-    schema.addColumn("col7", Type.FLOAT8);
-    schema.addColumn("col8", Type.TEXT);
-    schema.addColumn("col9", Type.BLOB);
-    schema.addColumn("col10", Type.INET4);
-    schema.addColumn("col11", Type.NULL_TYPE);
-    schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
     KeyValueSet options = new KeyValueSet();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
@@ -518,22 +490,8 @@ public class TestNextFetches {
     QueryId queryid = new QueryId("12345", 5);
     ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
 
-    Tuple tuple = new VTuple(12);
-    tuple.put(new Datum[] {
-        DatumFactory.createBool(true),
-        DatumFactory.createChar("jinho"),
-        DatumFactory.createInt2((short) 17),
-        DatumFactory.createInt4(59),
-        DatumFactory.createInt8(23l),
-        DatumFactory.createFloat4(77.9f),
-        DatumFactory.createFloat8(271.9f),
-        DatumFactory.createText("jinho"),
-        DatumFactory.createBlob("hyunsik babo".getBytes()),
-        DatumFactory.createInet4("192.168.0.1"),
-        NullDatum.get(),
-        factory.createDatum(queryid.getProto())
-    });
-    appender.addTuple(tuple);
+    int columnNum = allTypedTuple.size();
+    appender.addTuple(allTypedTuple);
     appender.flush();
     appender.close();
 
@@ -551,8 +509,8 @@ public class TestNextFetches {
     while (scanner.nextFetch(rowBlock)) {
       RowBlockReader reader = rowBlock.getReader();
       while (reader.next(retrieved)) {
-        for (int i = 0; i < tuple.size(); i++) {
-          assertEquals(tuple.get(i), retrieved.get(i));
+        for (int i = 0; i < allTypedTuple.size(); i++) {
+          assertEquals(allTypedTuple.get(i), retrieved.get(i));
         }
       }
     }
@@ -567,20 +525,6 @@ public class TestNextFetches {
   public void testRCFileBinarySerializeDeserialize() throws IOException {
     if(storeType != StoreType.RCFILE) return;
 
-    Schema schema = new Schema();
-    schema.addColumn("col1", Type.BOOLEAN);
-    schema.addColumn("col2", Type.CHAR, 7);
-    schema.addColumn("col3", Type.INT2);
-    schema.addColumn("col4", Type.INT4);
-    schema.addColumn("col5", Type.INT8);
-    schema.addColumn("col6", Type.FLOAT4);
-    schema.addColumn("col7", Type.FLOAT8);
-    schema.addColumn("col8", Type.TEXT);
-    schema.addColumn("col9", Type.BLOB);
-    schema.addColumn("col10", Type.INET4);
-    schema.addColumn("col11", Type.NULL_TYPE);
-    schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
     KeyValueSet options = new KeyValueSet();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
@@ -589,27 +533,7 @@ public class TestNextFetches {
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
-
-    QueryId queryid = new QueryId("12345", 5);
-    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
-    Tuple tuple = new VTuple(12);
-    tuple.put(new Datum[] {
-        DatumFactory.createBool(true),
-        DatumFactory.createBit((byte) 0x99),
-        DatumFactory.createChar("jinho"),
-        DatumFactory.createInt2((short) 17),
-        DatumFactory.createInt4(59),
-        DatumFactory.createInt8(23l),
-        DatumFactory.createFloat4(77.9f),
-        DatumFactory.createFloat8(271.9f),
-        DatumFactory.createText("jinho"),
-        DatumFactory.createBlob("hyunsik babo".getBytes()),
-        DatumFactory.createInet4("192.168.0.1"),
-        NullDatum.get(),
-        factory.createDatum(queryid.getProto())
-    });
-    appender.addTuple(tuple);
+    appender.addTuple(allTypedTuple);
     appender.flush();
     appender.close();
 
@@ -627,8 +551,8 @@ public class TestNextFetches {
     while (scanner.nextFetch(rowBlock)) {
       RowBlockReader reader = rowBlock.getReader();
       while (reader.next(retrieved)) {
-        for (int i = 0; i < tuple.size(); i++) {
-          assertEquals(tuple.get(i), retrieved.get(i));
+        for (int i = 0; i < allTypedTuple.size(); i++) {
+          assertEquals(allTypedTuple.get(i), retrieved.get(i));
         }
       }
     }
@@ -643,20 +567,6 @@ public class TestNextFetches {
   public void testSequenceFileTextSerializeDeserialize() throws IOException {
     if(storeType != StoreType.SEQUENCEFILE) return;
 
-    Schema schema = new Schema();
-    schema.addColumn("col1", Type.BOOLEAN);
-    schema.addColumn("col2", Type.CHAR, 7);
-    schema.addColumn("col3", Type.INT2);
-    schema.addColumn("col4", Type.INT4);
-    schema.addColumn("col5", Type.INT8);
-    schema.addColumn("col6", Type.FLOAT4);
-    schema.addColumn("col7", Type.FLOAT8);
-    schema.addColumn("col8", Type.TEXT);
-    schema.addColumn("col9", Type.BLOB);
-    schema.addColumn("col10", Type.INET4);
-    schema.addColumn("col11", Type.NULL_TYPE);
-    schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
     KeyValueSet options = new KeyValueSet();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
@@ -665,26 +575,7 @@ public class TestNextFetches {
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
-
-    QueryId queryid = new QueryId("12345", 5);
-    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
-    Tuple tuple = new VTuple(12);
-    tuple.put(new Datum[] {
-        DatumFactory.createBool(true),
-        DatumFactory.createChar("jinho"),
-        DatumFactory.createInt2((short) 17),
-        DatumFactory.createInt4(59),
-        DatumFactory.createInt8(23l),
-        DatumFactory.createFloat4(77.9f),
-        DatumFactory.createFloat8(271.9f),
-        DatumFactory.createText("jinho"),
-        DatumFactory.createBlob("hyunsik babo".getBytes()),
-        DatumFactory.createInet4("192.168.0.1"),
-        NullDatum.get(),
-        factory.createDatum(queryid.getProto())
-    });
-    appender.addTuple(tuple);
+    appender.addTuple(allTypedTuple);
     appender.flush();
     appender.close();
 
@@ -707,8 +598,8 @@ public class TestNextFetches {
     while (scanner.nextFetch(rowBlock)) {
       RowBlockReader reader = rowBlock.getReader();
       while (reader.next(retrieved)) {
-        for (int i = 0; i < tuple.size(); i++) {
-          assertEquals(tuple.get(i), retrieved.get(i));
+        for (int i = 0; i < allTypedTuple.size(); i++) {
+          assertEquals(allTypedTuple.get(i), retrieved.get(i));
         }
       }
     }
@@ -723,21 +614,6 @@ public class TestNextFetches {
   public void testSequenceFileBinarySerializeDeserialize() throws IOException {
     if(storeType != StoreType.SEQUENCEFILE) return;
 
-    Schema schema = new Schema();
-    schema.addColumn("col1", Type.BOOLEAN);
-    schema.addColumn("col2", Type.BIT);
-    schema.addColumn("col3", Type.CHAR, 7);
-    schema.addColumn("col4", Type.INT2);
-    schema.addColumn("col5", Type.INT4);
-    schema.addColumn("col6", Type.INT8);
-    schema.addColumn("col7", Type.FLOAT4);
-    schema.addColumn("col8", Type.FLOAT8);
-    schema.addColumn("col9", Type.TEXT);
-    schema.addColumn("col10", Type.BLOB);
-    schema.addColumn("col11", Type.INET4);
-    schema.addColumn("col12", Type.NULL_TYPE);
-    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
     KeyValueSet options = new KeyValueSet();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
@@ -747,26 +623,7 @@ public class TestNextFetches {
     appender.enableStats();
     appender.init();
 
-    QueryId queryid = new QueryId("12345", 5);
-    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
-    Tuple tuple = new VTuple(13);
-    tuple.put(new Datum[] {
-        DatumFactory.createBool(true),
-        DatumFactory.createBit((byte) 0x99),
-        DatumFactory.createChar("jinho"),
-        DatumFactory.createInt2((short) 17),
-        DatumFactory.createInt4(59),
-        DatumFactory.createInt8(23l),
-        DatumFactory.createFloat4(77.9f),
-        DatumFactory.createFloat8(271.9f),
-        DatumFactory.createText("jinho"),
-        DatumFactory.createBlob("hyunsik babo".getBytes()),
-        DatumFactory.createInet4("192.168.0.1"),
-        NullDatum.get(),
-        factory.createDatum(queryid.getProto())
-    });
-    appender.addTuple(tuple);
+    appender.addTuple(allTypedTuple);
     appender.flush();
     appender.close();
 
@@ -789,8 +646,8 @@ public class TestNextFetches {
     while (scanner.nextFetch(rowBlock)) {
       RowBlockReader reader = rowBlock.getReader();
       while (reader.next(retrieved)) {
-        for (int i = 0; i < tuple.size(); i++) {
-          assertEquals(tuple.get(i), retrieved.get(i));
+        for (int i = 0; i < allTypedTuple.size(); i++) {
+          assertEquals(allTypedTuple.get(i), retrieved.get(i));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java b/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
index 49a162b..bab4180 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
@@ -61,7 +61,7 @@ public class TestSchemaConverter {
       "  optional binary mytext (UTF8);\n" +
       "  optional binary myblob;\n" +
       // NULL_TYPE fields are not encoded.
-      "  optional binary myinet4;\n" +
+      "  optional int32 myinet4;\n" +
       "  optional binary myprotobuf;\n" +
       "}\n";
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/storage-default.xml b/tajo-storage/src/test/resources/storage-default.xml
index 5bdeff0..01674a2 100644
--- a/tajo-storage/src/test/resources/storage-default.xml
+++ b/tajo-storage/src/test/resources/storage-default.xml
@@ -45,7 +45,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,directraw</value>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,directraw,block_parquet</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -78,6 +78,10 @@
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
   <property>
+    <name>tajo.storage.fragment.block_parquet.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
     <name>tajo.storage.fragment.sequencefile.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
@@ -148,6 +152,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.block_parquet.class</name>
+    <value>parquet.hadoop.ParquetRowBlockScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.v2.parquet.class</name>
     <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
   </property>
@@ -175,7 +184,7 @@
   <!--- Appender Handler -->
   <property>
     <name>tajo.storage.appender-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,block_parquet</value>
   </property>
 
   <property>
@@ -214,6 +223,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.appender-handler.block_parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+  </property>
+
+  <property>
     <name>tajo.storage.appender-handler.sequencefile.class</name>
     <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
   </property>