You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:01 UTC

[04/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java
new file mode 100644
index 0000000..6d89ef2
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormat.java
@@ -0,0 +1,498 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.util.Collections.unmodifiableMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter.RecordFilter;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+public class TestInputFormat {
+
+  List<BlockMetaData> blocks;
+  BlockLocation[] hdfsBlocks;
+  FileStatus fileStatus;
+  MessageType schema;
+  FileMetaData fileMetaData;
+
+  /*
+    The test File contains 2-3 hdfs blocks based on the setting of each test, when hdfsBlock size is set to 50: [0-49][50-99]
+    each row group is of size 10, so the rowGroups layout on hdfs is like:
+    xxxxx xxxxx
+    each x is a row group, each groups of x's is a hdfsBlock
+   */
+  @Before
+  public void setUp() {
+    blocks = new ArrayList<BlockMetaData>();
+    for (int i = 0; i < 10; i++) {
+      blocks.add(newBlock(i * 10, 10));
+    }
+    schema = MessageTypeParser.parseMessageType("message doc { required binary foo; }");
+    fileMetaData = new FileMetaData(schema, new HashMap<String, String>(), "parquet-mr");
+  }
+
+  @Test
+  public void testThrowExceptionWhenMaxSplitSizeIsSmallerThanMinSplitSize() throws IOException {
+    try {
+      generateSplitByMinMaxSize(50, 49);
+      fail("should throw exception when max split size is smaller than the min split size");
+    } catch (ParquetDecodingException e) {
+      assertEquals("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = 49; minSplitSize is 50"
+              , e.getMessage());
+    }
+  }
+
+  @Test
+  public void testThrowExceptionWhenMaxSplitSizeIsNegative() throws IOException {
+    try {
+      generateSplitByMinMaxSize(-100, -50);
+      fail("should throw exception when max split size is negative");
+    } catch (ParquetDecodingException e) {
+      assertEquals("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = -50; minSplitSize is -100"
+              , e.getMessage());
+    }
+  }
+
+  @Test
+  public void testGetFilter() throws IOException {
+    IntColumn intColumn = intColumn("foo");
+    FilterPredicate p = or(eq(intColumn, 7), eq(intColumn, 12));
+    Configuration conf = new Configuration();
+    ParquetInputFormat.setFilterPredicate(conf, p);
+    Filter read = ParquetInputFormat.getFilter(conf);
+    assertTrue(read instanceof FilterPredicateCompat);
+    assertEquals(p, ((FilterPredicateCompat) read).getFilterPredicate());
+
+    conf = new Configuration();
+    ParquetInputFormat.setFilterPredicate(conf, not(p));
+    read = ParquetInputFormat.getFilter(conf);
+    assertTrue(read instanceof FilterPredicateCompat);
+    assertEquals(and(notEq(intColumn, 7), notEq(intColumn, 12)), ((FilterPredicateCompat) read).getFilterPredicate());
+
+    assertEquals(FilterCompat.NOOP, ParquetInputFormat.getFilter(new Configuration()));
+  }
+
+  /*
+    aaaaa bbbbb
+   */
+  @Test
+  public void testGenerateSplitsAlignedWithHDFSBlock() throws IOException {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateSplitByMinMaxSize(50, 50);
+    shouldSplitBlockSizeBe(splits, 5, 5);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 50, 50);
+
+    splits = generateSplitByMinMaxSize(0, Long.MAX_VALUE);
+    shouldSplitBlockSizeBe(splits, 5, 5);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 50, 50);
+
+  }
+
+  @Test
+  public void testRowGroupNotAlignToHDFSBlock() throws IOException {
+    //Test HDFS blocks size(51) is not multiple of row group size(10)
+    withHDFSBlockSize(51, 51);
+    List<ParquetInputSplit> splits = generateSplitByMinMaxSize(50, 50);
+    shouldSplitBlockSizeBe(splits, 5, 5);
+    shouldSplitLocationBe(splits, 0, 0);//for the second split, the first byte will still be in the first hdfs block, therefore the locations are both 0
+    shouldSplitLengthBe(splits, 50, 50);
+
+    //Test a rowgroup is greater than the hdfsBlock boundary
+    withHDFSBlockSize(49, 49);
+    splits = generateSplitByMinMaxSize(50, 50);
+    shouldSplitBlockSizeBe(splits, 5, 5);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 50, 50);
+
+    /*
+    aaaa bbbbb c
+    for the 5th row group, the midpoint is 45, but the end of first hdfsBlock is 44, therefore a new split(b) will be created
+    for 9th group, the mid point is 85, the end of second block is 88, so it's considered mainly in the 2nd hdfs block, and therefore inserted as
+    a row group of split b
+     */
+    withHDFSBlockSize(44,44,44);
+    splits = generateSplitByMinMaxSize(40, 50);
+    shouldSplitBlockSizeBe(splits, 4, 5, 1);
+    shouldSplitLocationBe(splits, 0, 0, 2);
+    shouldSplitLengthBe(splits, 40, 50, 10);
+  }
+
+  /*
+    when min size is 55, max size is 56, the first split will be generated with 6 row groups(size of 10 each), which satisfies split.size>min.size, but not split.size<max.size
+    aaaaa abbbb
+   */
+  @Test
+  public void testGenerateSplitsNotAlignedWithHDFSBlock() throws IOException, InterruptedException {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateSplitByMinMaxSize(55, 56);
+    shouldSplitBlockSizeBe(splits, 6, 4);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 60, 40);
+
+    withHDFSBlockSize(51, 51);
+    splits = generateSplitByMinMaxSize(55, 56);
+    shouldSplitBlockSizeBe(splits, 6, 4);
+    shouldSplitLocationBe(splits, 0, 1);//since a whole row group of split a is added to the second hdfs block, so the location of split b is still 1
+    shouldSplitLengthBe(splits, 60, 40);
+
+    withHDFSBlockSize(49, 49, 49);
+    splits = generateSplitByMinMaxSize(55, 56);
+    shouldSplitBlockSizeBe(splits, 6, 4);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 60, 40);
+
+  }
+
+  /*
+    when the max size is set to be 30, first split will be of size 30,
+    and when creating second split, it will try to align it to second hdfsBlock, and therefore generates a split of size 20
+    aaabb cccdd
+   */
+  @Test
+  public void testGenerateSplitsSmallerThanMaxSizeAndAlignToHDFS() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateSplitByMinMaxSize(18, 30);
+    shouldSplitBlockSizeBe(splits, 3, 2, 3, 2);
+    shouldSplitLocationBe(splits, 0, 0, 1, 1);
+    shouldSplitLengthBe(splits, 30, 20, 30, 20);
+
+    /*
+    aaabb cccdd
+         */
+    withHDFSBlockSize(51, 51);
+    splits = generateSplitByMinMaxSize(18, 30);
+    shouldSplitBlockSizeBe(splits, 3, 2, 3, 2);
+    shouldSplitLocationBe(splits, 0, 0, 0, 1);//the first byte of split c is in the first hdfs block
+    shouldSplitLengthBe(splits, 30, 20, 30, 20);
+
+    /*
+    aaabb cccdd
+     */
+    withHDFSBlockSize(49, 49, 49);
+    splits = generateSplitByMinMaxSize(18, 30);
+    shouldSplitBlockSizeBe(splits, 3, 2, 3, 2);
+    shouldSplitLocationBe(splits, 0, 0, 1, 1);
+    shouldSplitLengthBe(splits, 30, 20, 30, 20);
+  }
+
+  /*
+    when the min size is set to be 25, so the second split can not be aligned with the boundary of hdfs block, there for split of size 30 will be created as the 3rd split.
+    aaabb bcccd
+   */
+  @Test
+  public void testGenerateSplitsCrossHDFSBlockBoundaryToSatisfyMinSize() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateSplitByMinMaxSize(25, 30);
+    shouldSplitBlockSizeBe(splits, 3, 3, 3, 1);
+    shouldSplitLocationBe(splits, 0, 0, 1, 1);
+    shouldSplitLengthBe(splits, 30, 30, 30, 10);
+  }
+
+  /*
+    when rowGroups size is 10, but min split size is 10, max split size is 18, it will create splits of size 20 and of size 10 and align with hdfsBlocks
+    aabbc ddeef
+   */
+  @Test
+  public void testMultipleRowGroupsInABlockToAlignHDFSBlock() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateSplitByMinMaxSize(10, 18);
+    shouldSplitBlockSizeBe(splits, 2, 2, 1, 2, 2, 1);
+    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
+    shouldSplitLengthBe(splits, 20, 20, 10, 20, 20, 10);
+
+    /*
+    aabbc ddeef
+    notice the first byte of split d is in the first hdfs block:
+    when adding the 6th row group, although the first byte of it is in the first hdfs block
+    , but the mid point of the row group is in the second hdfs block, there for a new split(d) is created including that row group
+     */
+    withHDFSBlockSize(51, 51);
+    splits = generateSplitByMinMaxSize(10, 18);
+    shouldSplitBlockSizeBe(splits, 2, 2, 1, 2, 2, 1);
+    shouldSplitLocationBe(splits, 0, 0, 0, 0, 1, 1);// location of split d should be 0, since the first byte is in the first hdfs block
+    shouldSplitLengthBe(splits, 20, 20, 10, 20, 20, 10);
+
+    /*
+    aabbc ddeef
+    same as the case where block sizes are 50 50
+     */
+    withHDFSBlockSize(49, 49);
+    splits = generateSplitByMinMaxSize(10, 18);
+    shouldSplitBlockSizeBe(splits, 2, 2, 1, 2, 2, 1);
+    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
+    shouldSplitLengthBe(splits, 20, 20, 10, 20, 20, 10);
+  }
+
+  public static final class DummyUnboundRecordFilter implements UnboundRecordFilter {
+    @Override
+    public RecordFilter bind(Iterable<ColumnReader> readers) {
+      return null;
+    }
+  }
+
+  @Test
+  public void testOnlyOneKindOfFilterSupported() throws Exception {
+    IntColumn foo = intColumn("foo");
+    FilterPredicate p = or(eq(foo, 10), eq(foo, 11));
+
+    Job job = new Job();
+
+    Configuration conf = job.getConfiguration();
+    ParquetInputFormat.setUnboundRecordFilter(job, DummyUnboundRecordFilter.class);
+    try {
+      ParquetInputFormat.setFilterPredicate(conf, p);
+      fail("this should throw");
+    } catch (IllegalArgumentException e) {
+      assertEquals("You cannot provide a FilterPredicate after providing an UnboundRecordFilter", e.getMessage());
+    }
+
+    job = new Job();
+    conf = job.getConfiguration();
+
+    ParquetInputFormat.setFilterPredicate(conf, p);
+    try {
+      ParquetInputFormat.setUnboundRecordFilter(job, DummyUnboundRecordFilter.class);
+      fail("this should throw");
+    } catch (IllegalArgumentException e) {
+      assertEquals("You cannot provide an UnboundRecordFilter after providing a FilterPredicate", e.getMessage());
+    }
+
+  }
+
+  public static BlockMetaData makeBlockFromStats(IntStatistics stats, long valueCount) {
+    BlockMetaData blockMetaData = new BlockMetaData();
+
+    ColumnChunkMetaData column = ColumnChunkMetaData.get(ColumnPath.get("foo"),
+        PrimitiveTypeName.INT32,
+        CompressionCodecName.GZIP,
+        new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
+        stats,
+        100l, 100l, valueCount, 100l, 100l);
+    blockMetaData.addColumn(column);
+    blockMetaData.setTotalByteSize(200l);
+    blockMetaData.setRowCount(valueCount);
+    return blockMetaData;
+  }
+
+  @Test
+  public void testFooterCacheValueIsCurrent() throws IOException, InterruptedException {
+    File tempFile = getTempFile();
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    ParquetInputFormat.FootersCacheValue cacheValue = getDummyCacheValue(tempFile, fs);
+
+    assertTrue(tempFile.setLastModified(tempFile.lastModified() + 5000));
+    assertFalse(cacheValue.isCurrent(new ParquetInputFormat.FileStatusWrapper(fs.getFileStatus(new Path(tempFile.getAbsolutePath())))));
+  }
+
+  @Test
+  public void testFooterCacheValueIsNewer() throws IOException {
+    File tempFile = getTempFile();
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    ParquetInputFormat.FootersCacheValue cacheValue = getDummyCacheValue(tempFile, fs);
+
+    assertTrue(cacheValue.isNewerThan(null));
+    assertFalse(cacheValue.isNewerThan(cacheValue));
+
+    assertTrue(tempFile.setLastModified(tempFile.lastModified() + 5000));
+    ParquetInputFormat.FootersCacheValue newerCacheValue = getDummyCacheValue(tempFile, fs);
+
+    assertTrue(newerCacheValue.isNewerThan(cacheValue));
+    assertFalse(cacheValue.isNewerThan(newerCacheValue));
+  }
+
+  @Test
+  public void testDeprecatedConstructorOfParquetInputSplit() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateSplitByDeprecatedConstructor(50, 50);
+
+    shouldSplitBlockSizeBe(splits, 5, 5);
+    shouldOneSplitRowGroupOffsetBe(splits.get(0), 0, 10, 20, 30, 40);
+    shouldOneSplitRowGroupOffsetBe(splits.get(1), 50, 60, 70, 80, 90);
+    shouldSplitLengthBe(splits, 50, 50);
+    shouldSplitStartBe(splits, 0, 50);
+  }
+
+  private File getTempFile() throws IOException {
+    File tempFile = File.createTempFile("footer_", ".txt");
+    tempFile.deleteOnExit();
+    return tempFile;
+  }
+
+  private ParquetInputFormat.FootersCacheValue getDummyCacheValue(File file, FileSystem fs) throws IOException {
+    Path path = new Path(file.getPath());
+    FileStatus status = fs.getFileStatus(path);
+    ParquetInputFormat.FileStatusWrapper statusWrapper = new ParquetInputFormat.FileStatusWrapper(status);
+    ParquetMetadata mockMetadata = mock(ParquetMetadata.class);
+    ParquetInputFormat.FootersCacheValue cacheValue =
+            new ParquetInputFormat.FootersCacheValue(statusWrapper, new Footer(path, mockMetadata));
+    assertTrue(cacheValue.isCurrent(statusWrapper));
+    return cacheValue;
+  }
+
+  private static final Map<String, String> extramd;
+  static {
+    Map<String, String> md = new HashMap<String, String>();
+    md.put("specific", "foo");
+    extramd = unmodifiableMap(md);
+  }
+
+  private List<ParquetInputSplit> generateSplitByMinMaxSize(long min, long max) throws IOException {
+    return ClientSideMetadataSplitStrategy.generateSplits(
+        blocks, hdfsBlocks,
+        fileStatus,
+        schema.toString(),
+        extramd,
+        min, max);
+  }
+
+  private List<ParquetInputSplit> generateSplitByDeprecatedConstructor(long min, long max) throws
+      IOException {
+    List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
+    List<ClientSideMetadataSplitStrategy.SplitInfo> splitInfos = ClientSideMetadataSplitStrategy
+        .generateSplitInfo(blocks, hdfsBlocks, min, max);
+
+    for (ClientSideMetadataSplitStrategy.SplitInfo splitInfo : splitInfos) {
+      BlockMetaData lastRowGroup = splitInfo.getRowGroups().get(splitInfo.getRowGroupCount() - 1);
+      long end = lastRowGroup.getStartingPos() + lastRowGroup.getTotalByteSize();
+
+      ParquetInputSplit split = new ParquetInputSplit(fileStatus.getPath(),
+          splitInfo.hdfsBlock.getOffset(), end, splitInfo.hdfsBlock.getHosts(),
+          splitInfo.rowGroups, schema.toString(), null, null, extramd);
+      splits.add(split);
+    }
+
+    return splits;
+  }
+
+  private void shouldSplitStartBe(List<ParquetInputSplit> splits, long... offsets) {
+    assertEquals(message(splits), offsets.length, splits.size());
+    for (int i = 0; i < offsets.length; i++) {
+      assertEquals(message(splits) + i, offsets[i], splits.get(i).getStart());
+    }
+  }
+
+  private void shouldSplitBlockSizeBe(List<ParquetInputSplit> splits, int... sizes) {
+    assertEquals(message(splits), sizes.length, splits.size());
+    for (int i = 0; i < sizes.length; i++) {
+      assertEquals(message(splits) + i, sizes[i], splits.get(i).getRowGroupOffsets().length);
+    }
+  }
+
+  private void shouldSplitLocationBe(List<ParquetInputSplit> splits, int... locations) throws IOException {
+    assertEquals(message(splits), locations.length, splits.size());
+    for (int i = 0; i < locations.length; i++) {
+      int loc = locations[i];
+      ParquetInputSplit split = splits.get(i);
+      assertEquals(message(splits) + i, "[foo" + loc + ".datanode, bar" + loc + ".datanode]", Arrays.toString(split.getLocations()));
+    }
+  }
+
+  private void shouldOneSplitRowGroupOffsetBe(ParquetInputSplit split, int... rowGroupOffsets) {
+    assertEquals(split.toString(), rowGroupOffsets.length, split.getRowGroupOffsets().length);
+    for (int i = 0; i < rowGroupOffsets.length; i++) {
+      assertEquals(split.toString(), rowGroupOffsets[i], split.getRowGroupOffsets()[i]);
+    }
+  }
+
+  private String message(List<ParquetInputSplit> splits) {
+    return String.valueOf(splits) + " " + Arrays.toString(hdfsBlocks) + "\n";
+  }
+
+  private void shouldSplitLengthBe(List<ParquetInputSplit> splits, int... lengths) {
+    assertEquals(message(splits), lengths.length, splits.size());
+    for (int i = 0; i < lengths.length; i++) {
+      assertEquals(message(splits) + i, lengths[i], splits.get(i).getLength());
+    }
+  }
+
+  private void withHDFSBlockSize(long... blockSizes) {
+    hdfsBlocks = new BlockLocation[blockSizes.length];
+    long offset = 0;
+    for (int i = 0; i < blockSizes.length; i++) {
+      long blockSize = blockSizes[i];
+      hdfsBlocks[i] = new BlockLocation(new String[0], new String[]{"foo" + i + ".datanode", "bar" + i + ".datanode"}, offset, blockSize);
+      offset += blockSize;
+    }
+    fileStatus = new FileStatus(offset, false, 2, 50, 0, new Path("hdfs://foo.namenode:1234/bar"));
+  }
+
+  private BlockMetaData newBlock(long start, long compressedBlockSize) {
+    BlockMetaData blockMetaData = new BlockMetaData();
+    long uncompressedSize = compressedBlockSize * 2;//assuming the compression ratio is 2
+    ColumnChunkMetaData column = ColumnChunkMetaData.get(ColumnPath.get("foo"),
+                                                         PrimitiveTypeName.BINARY,
+                                                         CompressionCodecName.GZIP,
+                                                         new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
+                                                         new BinaryStatistics(),
+                                                         start, 0l, 0l, compressedBlockSize, uncompressedSize);
+    blockMetaData.addColumn(column);
+    blockMetaData.setTotalByteSize(uncompressedSize);
+    return blockMetaData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLruCache.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLruCache.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLruCache.java
new file mode 100644
index 0000000..eddbde1
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLruCache.java
@@ -0,0 +1,162 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestLruCache {
+  private static final String DEFAULT_KEY = "test";
+
+  private static final class SimpleValue implements LruCache.Value<String, SimpleValue> {
+    private boolean current;
+    private boolean newerThan;
+
+    public SimpleValue(boolean current, boolean newerThan) {
+      this.current = current;
+      this.newerThan = newerThan;
+    }
+
+    @Override
+    public boolean isCurrent(String key) {
+      return current;
+    }
+
+    public void setCurrent(boolean current) {
+      this.current = current;
+    }
+
+    @Override
+    public boolean isNewerThan(SimpleValue otherValue) {
+      return newerThan;
+    }
+
+  }
+
+  @Test
+  public void testMaxSize() {
+    LruCache<String, SimpleValue> cache = new LruCache<String, SimpleValue>(1);
+
+    String oldKey = DEFAULT_KEY;
+    String newKey = oldKey + "_new";
+
+    SimpleValue oldValue = new SimpleValue(true, true);
+    cache.put(oldKey, oldValue);
+    assertEquals(oldValue, cache.getCurrentValue(oldKey));
+    assertEquals(1, cache.size());
+
+    SimpleValue newValue = new SimpleValue(true, true);
+    cache.put(newKey, newValue);
+    assertNull(cache.getCurrentValue(oldKey));
+    assertEquals(newValue, cache.getCurrentValue(newKey));
+    assertEquals(1, cache.size());
+  }
+
+  @Test
+  public void testOlderValueIsIgnored() {
+    LruCache<String, SimpleValue> cache = new LruCache<String, SimpleValue>(1);
+
+    SimpleValue currentValue = new SimpleValue(true, true);
+    SimpleValue notAsCurrentValue = new SimpleValue(true, false);
+    cache.put(DEFAULT_KEY, currentValue);
+    cache.put(DEFAULT_KEY, notAsCurrentValue);
+    assertEquals(
+            "The existing value in the cache was overwritten",
+            currentValue,
+            cache.getCurrentValue(DEFAULT_KEY)
+    );
+  }
+
+  @Test
+  public void testOutdatedValueIsIgnored() {
+    LruCache<String, SimpleValue> cache = new LruCache<String, SimpleValue>(1);
+
+    SimpleValue outdatedValue = new SimpleValue(false, true);
+    cache.put(DEFAULT_KEY, outdatedValue);
+    assertEquals(0, cache.size());
+    assertNull(cache.getCurrentValue(DEFAULT_KEY));
+  }
+
+  @Test
+  public void testCurrentValueOverwritesExisting() {
+    LruCache<String, SimpleValue> cache = new LruCache<String, SimpleValue>(1);
+
+    SimpleValue currentValue = new SimpleValue(true, true);
+    SimpleValue notAsCurrentValue = new SimpleValue(true, false);
+    cache.put(DEFAULT_KEY, notAsCurrentValue);
+    assertEquals(1, cache.size());
+    cache.put(DEFAULT_KEY, currentValue);
+    assertEquals(1, cache.size());
+    assertEquals(
+            "The existing value in the cache was NOT overwritten",
+            currentValue,
+            cache.getCurrentValue(DEFAULT_KEY)
+    );
+  }
+
+  @Test
+  public void testGetOutdatedValueReturnsNull() {
+    LruCache<String, SimpleValue> cache = new LruCache<String, SimpleValue>(1);
+
+    SimpleValue value = new SimpleValue(true, true);
+    cache.put(DEFAULT_KEY, value);
+    assertEquals(1, cache.size());
+    assertEquals(value, cache.getCurrentValue(DEFAULT_KEY));
+
+    value.setCurrent(false);
+    assertNull("The value should not be current anymore", cache.getCurrentValue(DEFAULT_KEY));
+    assertEquals(0, cache.size());
+  }
+
+  @Test
+  public void testRemove() {
+    LruCache<String, SimpleValue> cache = new LruCache<String, SimpleValue>(1);
+
+    SimpleValue value = new SimpleValue(true, true);
+    cache.put(DEFAULT_KEY, value);
+    assertEquals(1, cache.size());
+    assertEquals(value, cache.getCurrentValue(DEFAULT_KEY));
+
+    // remove the only value
+    assertEquals(value, cache.remove(DEFAULT_KEY));
+    assertNull(cache.getCurrentValue(DEFAULT_KEY));
+    assertEquals(0, cache.size());
+  }
+
+  @Test
+  public void testClear() {
+    LruCache<String, SimpleValue> cache = new LruCache<String, SimpleValue>(2);
+
+    String key1 = DEFAULT_KEY + 1;
+    String key2 = DEFAULT_KEY + 2;
+    SimpleValue value = new SimpleValue(true, true);
+    cache.put(key1, value);
+    cache.put(key2, value);
+    assertEquals(value, cache.getCurrentValue(key1));
+    assertEquals(value, cache.getCurrentValue(key2));
+    assertEquals(2, cache.size());
+
+    cache.clear();
+    assertNull(cache.getCurrentValue(key1));
+    assertNull(cache.getCurrentValue(key2));
+    assertEquals(0, cache.size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
new file mode 100644
index 0000000..fd56957
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java
@@ -0,0 +1,106 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.util.Set;
+
+/**
+ * Verify MemoryManager could adjust its writers' allocated memory size.
+ */
+public class TestMemoryManager {
+
+  Configuration conf = new Configuration();
+  String writeSchema = "message example {\n" +
+      "required int32 line;\n" +
+      "required binary content;\n" +
+      "}";
+  long expectPoolSize;
+  int rowGroupSize;
+  ParquetOutputFormat parquetOutputFormat;
+  CompressionCodecName codec;
+
+  @Before
+  public void setUp() {
+    GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema),conf);
+    expectPoolSize = Math.round(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax
+        () * MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
+    rowGroupSize = (int) Math.floor(expectPoolSize / 2);
+    conf.setInt(ParquetOutputFormat.BLOCK_SIZE, rowGroupSize);
+    codec = CompressionCodecName.UNCOMPRESSED;
+  }
+
+  @After
+  public void tearDown() throws Exception{
+    FileUtils.deleteDirectory(new File("target/test"));
+  }
+
+  @Test
+  public void testMemoryManager() throws Exception {
+    //Verify the adjusted rowGroupSize of writers
+    RecordWriter writer1 = createWriter(1);
+    verifyRowGroupSize(rowGroupSize);
+
+    RecordWriter writer2 = createWriter(2);
+    verifyRowGroupSize(rowGroupSize);
+
+    RecordWriter writer3 = createWriter(3);
+    verifyRowGroupSize((int) Math.floor(expectPoolSize / 3));
+
+    writer1.close(null);
+    verifyRowGroupSize(rowGroupSize);
+
+    writer2.close(null);
+    verifyRowGroupSize(rowGroupSize);
+
+    writer3.close(null);
+
+    //Verify the memory pool
+    Assert.assertEquals("memory pool size is incorrect.", expectPoolSize,
+        parquetOutputFormat.getMemoryManager().getTotalMemoryPool());
+  }
+
+  private RecordWriter createWriter(int index) throws Exception{
+    Path file = new Path("target/test/", "parquet" + index);
+    parquetOutputFormat = new ParquetOutputFormat(new GroupWriteSupport());
+    return parquetOutputFormat.getRecordWriter(conf, file, codec);
+  }
+
+  private void verifyRowGroupSize(int expectRowGroupSize) {
+    Set<InternalParquetRecordWriter> writers = parquetOutputFormat.getMemoryManager()
+        .getWriterList().keySet();
+    for (InternalParquetRecordWriter writer : writers) {
+      Assert.assertEquals("wrong rowGroupSize", expectRowGroupSize,
+          writer.getRowGroupSizeThreshold(), 1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
new file mode 100644
index 0000000..20ad3e9
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -0,0 +1,499 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.statistics.LongStatistics;
+import org.apache.parquet.format.Statistics;
+import org.apache.parquet.hadoop.metadata.*;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import static org.junit.Assert.*;
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.Type.Repetition.*;
+import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+
+public class TestParquetFileWriter {
+  private static final Log LOG = Log.getLog(TestParquetFileWriter.class);
+  private String writeSchema;
+
+  @Test
+  public void testWriteMode() throws Exception {
+    File testDir = new File("target/test/TestParquetFileWriter/");
+    testDir.mkdirs();
+    File testFile = new File(testDir, "testParquetFile");
+    testFile = testFile.getAbsoluteFile();
+    testFile.createNewFile();
+    MessageType schema = MessageTypeParser.parseMessageType(
+        "message m { required group a {required binary b;} required group "
+        + "c { required int64 d; }}");
+    Configuration conf = new Configuration();
+
+    ParquetFileWriter writer = null;
+    boolean exceptionThrown = false;
+    Path path = new Path(testFile.toURI());
+    try {
+      writer = new ParquetFileWriter(conf, schema, path,
+          ParquetFileWriter.Mode.CREATE);
+    } catch(IOException ioe1) {
+      exceptionThrown = true;
+    }
+    assertTrue(exceptionThrown);
+    exceptionThrown = false;
+    try {
+      writer = new ParquetFileWriter(conf, schema, path,
+          ParquetFileWriter.Mode.OVERWRITE);
+    } catch(IOException ioe2) {
+      exceptionThrown = true;
+    }
+    assertTrue(!exceptionThrown);
+    testFile.delete();
+  }
+
+  @Test
+  public void testWriteRead() throws Exception {
+
+    File testFile = new File("target/test/TestParquetFileWriter/testParquetFile").getAbsoluteFile();
+    testFile.delete();
+
+    Path path = new Path(testFile.toURI());
+    Configuration configuration = new Configuration();
+
+    MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;} required group c { required int64 d; }}");
+    String[] path1 = {"a", "b"};
+    ColumnDescriptor c1 = schema.getColumnDescription(path1);
+    String[] path2 = {"c", "d"};
+    ColumnDescriptor c2 = schema.getColumnDescription(path2);
+
+    byte[] bytes1 = { 0, 1, 2, 3};
+    byte[] bytes2 = { 1, 2, 3, 4};
+    byte[] bytes3 = { 2, 3, 4, 5};
+    byte[] bytes4 = { 3, 4, 5, 6};
+    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+
+    BinaryStatistics stats1 = new BinaryStatistics();
+    BinaryStatistics stats2 = new BinaryStatistics();
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(c1, 5, codec);
+    long c1Starts = w.getPos();
+    w.writeDataPage(2, 4, BytesInput.from(bytes1), stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(bytes1), stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    long c1Ends = w.getPos();
+    w.startColumn(c2, 6, codec);
+    long c2Starts = w.getPos();
+    w.writeDataPage(2, 4, BytesInput.from(bytes2), stats2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(bytes2), stats2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(1, 4, BytesInput.from(bytes2), stats2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    long c2Ends = w.getPos();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(c1, 7, codec);
+    w.writeDataPage(7, 4, BytesInput.from(bytes3), stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(c2, 8, codec);
+    w.writeDataPage(8, 4, BytesInput.from(bytes4), stats2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+
+    ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
+    assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size());
+    assertEquals(c1Ends - c1Starts, readFooter.getBlocks().get(0).getColumns().get(0).getTotalSize());
+    assertEquals(c2Ends - c2Starts, readFooter.getBlocks().get(0).getColumns().get(1).getTotalSize());
+    assertEquals(c2Ends - c1Starts, readFooter.getBlocks().get(0).getTotalByteSize());
+    HashSet<Encoding> expectedEncoding=new HashSet<Encoding>();
+    expectedEncoding.add(PLAIN);
+    expectedEncoding.add(BIT_PACKED);
+    assertEquals(expectedEncoding,readFooter.getBlocks().get(0).getColumns().get(0).getEncodings());
+
+    { // read first block of col #1
+      ParquetFileReader r = new ParquetFileReader(configuration, path, Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(path1)));
+      PageReadStore pages = r.readNextRowGroup();
+      assertEquals(3, pages.getRowCount());
+      validateContains(schema, pages, path1, 2, BytesInput.from(bytes1));
+      validateContains(schema, pages, path1, 3, BytesInput.from(bytes1));
+      assertNull(r.readNextRowGroup());
+    }
+
+    { // read all blocks of col #1 and #2
+
+      ParquetFileReader r = new ParquetFileReader(configuration, path, readFooter.getBlocks(), Arrays.asList(schema.getColumnDescription(path1), schema.getColumnDescription(path2)));
+
+      PageReadStore pages = r.readNextRowGroup();
+      assertEquals(3, pages.getRowCount());
+      validateContains(schema, pages, path1, 2, BytesInput.from(bytes1));
+      validateContains(schema, pages, path1, 3, BytesInput.from(bytes1));
+      validateContains(schema, pages, path2, 2, BytesInput.from(bytes2));
+      validateContains(schema, pages, path2, 3, BytesInput.from(bytes2));
+      validateContains(schema, pages, path2, 1, BytesInput.from(bytes2));
+
+      pages = r.readNextRowGroup();
+      assertEquals(4, pages.getRowCount());
+
+      validateContains(schema, pages, path1, 7, BytesInput.from(bytes3));
+      validateContains(schema, pages, path2, 8, BytesInput.from(bytes4));
+
+      assertNull(r.readNextRowGroup());
+    }
+    PrintFooter.main(new String[] {path.toString()});
+  }
+
+  @Test
+  public void testConvertToThriftStatistics() throws Exception {
+    long[] longArray = new long[] {39L, 99L, 12L, 1000L, 65L, 542L, 2533461316L, -253346131996L, Long.MAX_VALUE, Long.MIN_VALUE};
+    LongStatistics parquetMRstats = new LongStatistics();
+
+    for (long l: longArray) {
+      parquetMRstats.updateStats(l);
+    }
+    Statistics thriftStats = org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(parquetMRstats);
+    LongStatistics convertedBackStats = (LongStatistics) org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(thriftStats, PrimitiveTypeName.INT64);
+
+    assertEquals(parquetMRstats.getMax(), convertedBackStats.getMax());
+    assertEquals(parquetMRstats.getMin(), convertedBackStats.getMin());
+    assertEquals(parquetMRstats.getNumNulls(), convertedBackStats.getNumNulls());
+  }
+
+  @Test
+  public void testWriteReadStatistics() throws Exception {
+
+    File testFile = new File("target/test/TestParquetFileWriter/testParquetFile").getAbsoluteFile();
+    testFile.delete();
+
+    Path path = new Path(testFile.toURI());
+    Configuration configuration = new Configuration();
+
+    MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;} required group c { required int64 d; }}");
+    String[] path1 = {"a", "b"};
+    ColumnDescriptor c1 = schema.getColumnDescription(path1);
+    String[] path2 = {"c", "d"};
+    ColumnDescriptor c2 = schema.getColumnDescription(path2);
+
+    byte[] bytes1 = { 0, 1, 2, 3};
+    byte[] bytes2 = { 1, 2, 3, 4};
+    byte[] bytes3 = { 2, 3, 4, 5};
+    byte[] bytes4 = { 3, 4, 5, 6};
+    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+
+    BinaryStatistics statsB1C1P1 = new BinaryStatistics();
+    BinaryStatistics statsB1C1P2 = new BinaryStatistics();
+    LongStatistics statsB1C2P1 = new LongStatistics();
+    LongStatistics statsB1C2P2 = new LongStatistics();
+    BinaryStatistics statsB2C1P1 = new BinaryStatistics();
+    LongStatistics statsB2C2P1 = new LongStatistics();
+    statsB1C1P1.setMinMax(Binary.fromString("s"), Binary.fromString("z"));
+    statsB1C1P2.setMinMax(Binary.fromString("a"), Binary.fromString("b"));
+    statsB1C2P1.setMinMax(2l, 10l);
+    statsB1C2P2.setMinMax(-6l, 4l);
+    statsB2C1P1.setMinMax(Binary.fromString("d"), Binary.fromString("e"));
+    statsB2C2P1.setMinMax(11l, 122l);
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(c1, 5, codec);
+    w.writeDataPage(2, 4, BytesInput.from(bytes1), statsB1C1P1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(bytes1), statsB1C1P2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(c2, 6, codec);
+    w.writeDataPage(3, 4, BytesInput.from(bytes2), statsB1C2P1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(1, 4, BytesInput.from(bytes2), statsB1C2P2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+
+    w.startBlock(4);
+    w.startColumn(c1, 7, codec);
+    w.writeDataPage(7, 4, BytesInput.from(bytes3), statsB2C1P1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(c2, 8, codec);
+    w.writeDataPage(8, 4, BytesInput.from(bytes4), statsB2C2P1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+
+    ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
+    for (BlockMetaData block : readFooter.getBlocks()) {
+      for (ColumnChunkMetaData col : block.getColumns()) {
+        col.getPath();
+      }
+    }
+    // correct statistics
+    BinaryStatistics bs1 = new BinaryStatistics();
+    bs1.setMinMax(Binary.fromString("a"), Binary.fromString("z"));
+    LongStatistics ls1 = new LongStatistics();
+    ls1.setMinMax(-6l, 10l);
+
+    BinaryStatistics bs2 = new BinaryStatistics();
+    bs2.setMinMax(Binary.fromString("d"), Binary.fromString("e"));
+    LongStatistics ls2 = new LongStatistics();
+    ls2.setMinMax(11l, 122l);
+
+    { // assert stats are correct for the first block
+      BinaryStatistics bsout = (BinaryStatistics)readFooter.getBlocks().get(0).getColumns().get(0).getStatistics();
+      String str = new String(bsout.getMaxBytes());
+      String str2 = new String(bsout.getMinBytes());
+
+      assertTrue(((BinaryStatistics)readFooter.getBlocks().get(0).getColumns().get(0).getStatistics()).equals(bs1));
+      assertTrue(((LongStatistics)readFooter.getBlocks().get(0).getColumns().get(1).getStatistics()).equals(ls1));
+    }
+    { // assert stats are correct for the second block
+      assertTrue(((BinaryStatistics)readFooter.getBlocks().get(1).getColumns().get(0).getStatistics()).equals(bs2));
+      assertTrue(((LongStatistics)readFooter.getBlocks().get(1).getColumns().get(1).getStatistics()).equals(ls2));
+    }
+  }
+
+  @Test
+  public void testMetaDataFile() throws Exception {
+
+    File testDir = new File("target/test/TestParquetFileWriter/testMetaDataFileDir").getAbsoluteFile();
+
+    Path testDirPath = new Path(testDir.toURI());
+    Configuration configuration = new Configuration();
+
+    final FileSystem fs = testDirPath.getFileSystem(configuration);
+    enforceEmptyDir(configuration, testDirPath);
+
+    MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;} required group c { required int64 d; }}");
+    createFile(configuration, new Path(testDirPath, "part0"), schema);
+    createFile(configuration, new Path(testDirPath, "part1"), schema);
+    createFile(configuration, new Path(testDirPath, "part2"), schema);
+
+    FileStatus outputStatus = fs.getFileStatus(testDirPath);
+    List<Footer> footers = ParquetFileReader.readFooters(configuration, outputStatus, false);
+    validateFooters(footers);
+    ParquetFileWriter.writeMetadataFile(configuration, testDirPath, footers);
+
+    footers = ParquetFileReader.readFooters(configuration, outputStatus, false);
+    validateFooters(footers);
+    footers = ParquetFileReader.readFooters(configuration, fs.getFileStatus(new Path(testDirPath, "part0")), false);
+    assertEquals(1, footers.size());
+
+    final FileStatus metadataFile = fs.getFileStatus(new Path(testDirPath, ParquetFileWriter.PARQUET_METADATA_FILE));
+    final FileStatus metadataFileLight = fs.getFileStatus(new Path(testDirPath, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE));
+    final List<Footer> metadata = ParquetFileReader.readSummaryFile(configuration, metadataFile);
+
+    validateFooters(metadata);
+
+    footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, Arrays.asList(fs.listStatus(testDirPath, HiddenFileFilter.INSTANCE)), false);
+    validateFooters(footers);
+
+    fs.delete(metadataFile.getPath(), false);
+    fs.delete(metadataFileLight.getPath(), false);
+
+    footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, Arrays.asList(fs.listStatus(testDirPath)), false);
+    validateFooters(footers);
+
+  }
+
+  @Test
+  public void testWriteReadStatisticsAllNulls() throws Exception {
+
+    File testFile = new File("target/test/TestParquetFileWriter/testParquetFile").getAbsoluteFile();
+    testFile.delete();
+
+    writeSchema = "message example {\n" +
+            "required binary content;\n" +
+            "}";
+
+    Path path = new Path(testFile.toURI());
+
+    MessageType schema = MessageTypeParser.parseMessageType(writeSchema);
+    Configuration configuration = new Configuration();
+    GroupWriteSupport.setSchema(schema, configuration);
+
+    ParquetWriter<Group> writer = new ParquetWriter<Group>(path, configuration, new GroupWriteSupport());
+   
+    Group r1 = new SimpleGroup(schema);
+    writer.write(r1);
+    writer.close();
+    
+    ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
+    
+    // assert the statistics object is not empty
+    assertTrue((readFooter.getBlocks().get(0).getColumns().get(0).getStatistics().isEmpty()) == false);
+    // assert the number of nulls are correct for the first block
+    assertEquals(1, (readFooter.getBlocks().get(0).getColumns().get(0).getStatistics().getNumNulls()));
+  }
+
+  private void validateFooters(final List<Footer> metadata) {
+    LOG.debug(metadata);
+    assertEquals(String.valueOf(metadata), 3, metadata.size());
+    for (Footer footer : metadata) {
+      final File file = new File(footer.getFile().toUri());
+      assertTrue(file.getName(), file.getName().startsWith("part"));
+      assertTrue(file.getPath(), file.exists());
+      final ParquetMetadata parquetMetadata = footer.getParquetMetadata();
+      assertEquals(2, parquetMetadata.getBlocks().size());
+      final Map<String, String> keyValueMetaData = parquetMetadata.getFileMetaData().getKeyValueMetaData();
+      assertEquals("bar", keyValueMetaData.get("foo"));
+      assertEquals(footer.getFile().getName(), keyValueMetaData.get(footer.getFile().getName()));
+    }
+  }
+
+
+  private void createFile(Configuration configuration, Path path, MessageType schema) throws IOException {
+    String[] path1 = {"a", "b"};
+    ColumnDescriptor c1 = schema.getColumnDescription(path1);
+    String[] path2 = {"c", "d"};
+    ColumnDescriptor c2 = schema.getColumnDescription(path2);
+
+    byte[] bytes1 = { 0, 1, 2, 3};
+    byte[] bytes2 = { 1, 2, 3, 4};
+    byte[] bytes3 = { 2, 3, 4, 5};
+    byte[] bytes4 = { 3, 4, 5, 6};
+    CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED;
+
+    BinaryStatistics stats1 = new BinaryStatistics();
+    BinaryStatistics stats2 = new BinaryStatistics();
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(c1, 5, codec);
+    w.writeDataPage(2, 4, BytesInput.from(bytes1), stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(bytes1), stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(c2, 6, codec);
+    w.writeDataPage(2, 4, BytesInput.from(bytes2), stats2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(bytes2), stats2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(1, 4, BytesInput.from(bytes2), stats2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(c1, 7, codec);
+    w.writeDataPage(7, 4, BytesInput.from(bytes3), stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(c2, 8, codec);
+    w.writeDataPage(8, 4, BytesInput.from(bytes4), stats2, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    final HashMap<String, String> extraMetaData = new HashMap<String, String>();
+    extraMetaData.put("foo", "bar");
+    extraMetaData.put(path.getName(), path.getName());
+    w.end(extraMetaData);
+  }
+
+  private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes) throws IOException {
+    PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
+    DataPage page = pageReader.readPage();
+    assertEquals(values, page.getValueCount());
+    assertArrayEquals(bytes.toByteArray(), ((DataPageV1)page).getBytes().toByteArray());
+  }
+
+  @Test
+  public void testMergeMetadata() {
+    FileMetaData md1 = new FileMetaData(
+        new MessageType("root1",
+            new PrimitiveType(REPEATED, BINARY, "a"),
+            new PrimitiveType(OPTIONAL, BINARY, "b")),
+        new HashMap<String, String>(), "test");
+    FileMetaData md2 = new FileMetaData(
+        new MessageType("root2",
+            new PrimitiveType(REQUIRED, BINARY, "c")),
+        new HashMap<String, String>(), "test2");
+    GlobalMetaData merged = ParquetFileWriter.mergeInto(md2, ParquetFileWriter.mergeInto(md1, null));
+    assertEquals(
+        merged.getSchema(),
+        new MessageType("root1",
+            new PrimitiveType(REPEATED, BINARY, "a"),
+            new PrimitiveType(OPTIONAL, BINARY, "b"),
+            new PrimitiveType(REQUIRED, BINARY, "c"))
+        );
+
+  }
+
+  @Test
+  public void testMergeFooters() {
+    List<BlockMetaData> oneBlocks = new ArrayList<BlockMetaData>();
+    oneBlocks.add(new BlockMetaData());
+    oneBlocks.add(new BlockMetaData());
+    List<BlockMetaData> twoBlocks = new ArrayList<BlockMetaData>();
+    twoBlocks.add(new BlockMetaData());
+    List<BlockMetaData> expected = new ArrayList<BlockMetaData>();
+    expected.addAll(oneBlocks);
+    expected.addAll(twoBlocks);
+
+    Footer one = new Footer(new Path("file:/tmp/output/one.parquet"),
+        new ParquetMetadata(new FileMetaData(
+            new MessageType("root1",
+                new PrimitiveType(REPEATED, BINARY, "a"),
+                new PrimitiveType(OPTIONAL, BINARY, "b")),
+            new HashMap<String, String>(), "test"),
+        oneBlocks));
+
+    Footer two = new Footer(new Path("/tmp/output/two.parquet"),
+        new ParquetMetadata(new FileMetaData(
+            new MessageType("root2",
+                new PrimitiveType(REQUIRED, BINARY, "c")),
+            new HashMap<String, String>(), "test2"),
+            twoBlocks));
+
+    List<Footer> footers = new ArrayList<Footer>();
+    footers.add(one);
+    footers.add(two);
+
+    ParquetMetadata merged = ParquetFileWriter.mergeFooters(
+        new Path("/tmp"), footers);
+
+    assertEquals(
+        new MessageType("root1",
+            new PrimitiveType(REPEATED, BINARY, "a"),
+            new PrimitiveType(OPTIONAL, BINARY, "b"),
+            new PrimitiveType(REQUIRED, BINARY, "c")),
+        merged.getFileMetaData().getSchema());
+
+    assertEquals("Should have all blocks", expected, merged.getBlocks());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
new file mode 100644
index 0000000..2583278
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -0,0 +1,129 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+
+public class TestParquetWriter {
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    Path root = new Path("target/tests/TestParquetWriter/");
+    enforceEmptyDir(conf, root);
+    MessageType schema = parseMessageType(
+        "message test { "
+        + "required binary binary_field; "
+        + "required int32 int32_field; "
+        + "required int64 int64_field; "
+        + "required boolean boolean_field; "
+        + "required float float_field; "
+        + "required double double_field; "
+        + "required fixed_len_byte_array(3) flba_field; "
+        + "required int96 int96_field; "
+        + "} ");
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    Map<String, Encoding> expected = new HashMap<String, Encoding>();
+    expected.put("10-" + PARQUET_1_0, PLAIN_DICTIONARY);
+    expected.put("1000-" + PARQUET_1_0, PLAIN);
+    expected.put("10-" + PARQUET_2_0, RLE_DICTIONARY);
+    expected.put("1000-" + PARQUET_2_0, DELTA_BYTE_ARRAY);
+    for (int modulo : asList(10, 1000)) {
+      for (WriterVersion version : WriterVersion.values()) {
+        Path file = new Path(root, version.name() + "_" + modulo);
+        ParquetWriter<Group> writer = new ParquetWriter<Group>(
+            file,
+            new GroupWriteSupport(),
+            UNCOMPRESSED, 1024, 1024, 512, true, false, version, conf);
+        for (int i = 0; i < 1000; i++) {
+          writer.write(
+              f.newGroup()
+              .append("binary_field", "test" + (i % modulo))
+              .append("int32_field", 32)
+              .append("int64_field", 64l)
+              .append("boolean_field", true)
+              .append("float_field", 1.0f)
+              .append("double_field", 2.0d)
+              .append("flba_field", "foo")
+              .append("int96_field", Binary.fromByteArray(new byte[12])));
+        }
+        writer.close();
+        ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file).withConf(conf).build();
+        for (int i = 0; i < 1000; i++) {
+          Group group = reader.read();
+          assertEquals("test" + (i % modulo), group.getBinary("binary_field", 0).toStringUsingUTF8());
+          assertEquals(32, group.getInteger("int32_field", 0));
+          assertEquals(64l, group.getLong("int64_field", 0));
+          assertEquals(true, group.getBoolean("boolean_field", 0));
+          assertEquals(1.0f, group.getFloat("float_field", 0), 0.001);
+          assertEquals(2.0d, group.getDouble("double_field", 0), 0.001);
+          assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8());
+          assertEquals(Binary.fromByteArray(new byte[12]), group.getInt96("int96_field", 0));
+        }
+        reader.close();
+        ParquetMetadata footer = readFooter(conf, file, NO_FILTER);
+        for (BlockMetaData blockMetaData : footer.getBlocks()) {
+          for (ColumnChunkMetaData column : blockMetaData.getColumns()) {
+            if (column.getPath().toDotString().equals("binary_field")) {
+              String key = modulo + "-" + version;
+              Encoding expectedEncoding = expected.get(key);
+              assertTrue(
+                  key + ":" + column.getEncodings() + " should contain " + expectedEncoding,
+                  column.getEncodings().contains(expectedEncoding));
+            }
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java
new file mode 100644
index 0000000..9dd1323
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java
@@ -0,0 +1,135 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+
+public class TestParquetWriterNewPage {
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    Path root = new Path("target/tests/TestParquetWriter/");
+    FileSystem fs = root.getFileSystem(conf);
+    if (fs.exists(root)) {
+      fs.delete(root, true);
+    }
+    fs.mkdirs(root);
+    MessageType schema = parseMessageType(
+        "message test { "
+        + "required binary binary_field; "
+        + "required int32 int32_field; "
+        + "required int64 int64_field; "
+        + "required boolean boolean_field; "
+        + "required float float_field; "
+        + "required double double_field; "
+        + "required fixed_len_byte_array(3) flba_field; "
+        + "required int96 int96_field; "
+        + "optional binary null_field; "
+        + "} ");
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    Map<String, Encoding> expected = new HashMap<String, Encoding>();
+    expected.put("10-" + PARQUET_1_0, PLAIN_DICTIONARY);
+    expected.put("1000-" + PARQUET_1_0, PLAIN);
+    expected.put("10-" + PARQUET_2_0, RLE_DICTIONARY);
+    expected.put("1000-" + PARQUET_2_0, DELTA_BYTE_ARRAY);
+    for (int modulo : asList(10, 1000)) {
+      for (WriterVersion version : WriterVersion.values()) {
+        Path file = new Path(root, version.name() + "_" + modulo);
+        ParquetWriter<Group> writer = new ParquetWriter<Group>(
+            file,
+            new GroupWriteSupport(),
+            UNCOMPRESSED, 1024, 1024, 512, true, false, version, conf);
+        for (int i = 0; i < 1000; i++) {
+          writer.write(
+              f.newGroup()
+              .append("binary_field", "test" + (i % modulo))
+              .append("int32_field", 32)
+              .append("int64_field", 64l)
+              .append("boolean_field", true)
+              .append("float_field", 1.0f)
+              .append("double_field", 2.0d)
+              .append("flba_field", "foo")
+              .append("int96_field", Binary.fromByteArray(new byte[12])));
+        }
+        writer.close();
+
+        ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file).withConf(conf).build();
+        for (int i = 0; i < 1000; i++) {
+          Group group = reader.read();
+          assertEquals("test" + (i % modulo), group.getBinary("binary_field", 0).toStringUsingUTF8());
+          assertEquals(32, group.getInteger("int32_field", 0));
+          assertEquals(64l, group.getLong("int64_field", 0));
+          assertEquals(true, group.getBoolean("boolean_field", 0));
+          assertEquals(1.0f, group.getFloat("float_field", 0), 0.001);
+          assertEquals(2.0d, group.getDouble("double_field", 0), 0.001);
+          assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8());
+          assertEquals(Binary.fromByteArray(new byte[12]), group.getInt96("int96_field", 0));
+          assertEquals(0, group.getFieldRepetitionCount("null_field"));
+        }
+        reader.close();
+        ParquetMetadata footer = readFooter(conf, file, NO_FILTER);
+        for (BlockMetaData blockMetaData : footer.getBlocks()) {
+          for (ColumnChunkMetaData column : blockMetaData.getColumns()) {
+            if (column.getPath().toDotString().equals("binary_field")) {
+              String key = modulo + "-" + version;
+              Encoding expectedEncoding = expected.get(key);
+              assertTrue(
+                  key + ":" + column.getEncodings() + " should contain " + expectedEncoding,
+                  column.getEncodings().contains(expectedEncoding));
+            }
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestSnappyCodec.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestSnappyCodec.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestSnappyCodec.java
new file mode 100644
index 0000000..fc5ac0f
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestSnappyCodec.java
@@ -0,0 +1,140 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.xerial.snappy.Snappy;
+
+import org.apache.parquet.hadoop.codec.SnappyCodec;
+import org.apache.parquet.hadoop.codec.SnappyCompressor;
+import org.apache.parquet.hadoop.codec.SnappyDecompressor;
+
+public class TestSnappyCodec {
+  @Test
+  public void TestSnappy() throws IOException {
+    // Reuse the snappy objects between test cases
+    SnappyCompressor compressor = new SnappyCompressor();
+    SnappyDecompressor decompressor = new SnappyDecompressor();
+
+    TestSnappy(compressor, decompressor, "");    
+    TestSnappy(compressor, decompressor, "FooBar");    
+    TestSnappy(compressor, decompressor, "FooBar1", "FooBar2");    
+    TestSnappy(compressor, decompressor, "FooBar");
+    TestSnappy(compressor, decompressor, "a", "blahblahblah", "abcdef");    
+    TestSnappy(compressor, decompressor, "");
+    TestSnappy(compressor, decompressor, "FooBar");
+  }
+  
+  @Test
+  public void TestSnappyStream() throws IOException {
+    SnappyCodec codec = new SnappyCodec();
+    codec.setConf(new Configuration());
+    
+    int blockSize = 1024;
+    int inputSize = blockSize * 1024;
+ 
+    byte[] input = new byte[inputSize];
+    for (int i = 0; i < inputSize; ++i) {
+      input[i] = (byte)i;
+    }
+
+    ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
+    
+    CompressionOutputStream compressor = codec.createOutputStream(compressedStream);
+    int bytesCompressed = 0;
+    while (bytesCompressed < inputSize) {
+      int len = Math.min(inputSize - bytesCompressed, blockSize);
+      compressor.write(input, bytesCompressed, len);
+      bytesCompressed += len;
+    }
+    compressor.finish();
+    
+    byte[] rawCompressed = Snappy.compress(input);
+    byte[] codecCompressed = compressedStream.toByteArray();
+    
+    // Validate that the result from the codec is the same as if we compressed the 
+    // buffer directly.
+    assertArrayEquals(rawCompressed, codecCompressed);
+
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(codecCompressed);    
+    CompressionInputStream decompressor = codec.createInputStream(inputStream);
+    byte[] codecDecompressed = new byte[inputSize];
+    int bytesDecompressed = 0;
+    int numBytes;
+    while ((numBytes = decompressor.read(codecDecompressed, bytesDecompressed, blockSize)) != 0) {
+      bytesDecompressed += numBytes;
+      if (bytesDecompressed == inputSize) break;
+    }
+    
+    byte[] rawDecompressed = Snappy.uncompress(rawCompressed);
+    
+    assertArrayEquals(input, rawDecompressed);
+    assertArrayEquals(input, codecDecompressed);
+  }
+
+  private void TestSnappy(SnappyCompressor compressor, SnappyDecompressor decompressor, 
+      String... strings) throws IOException {
+    compressor.reset();
+    decompressor.reset();
+
+    int uncompressedSize = 0;
+    for (String s: strings) {
+      uncompressedSize += s.length();
+    }
+    byte[] uncompressedData = new byte[uncompressedSize];
+    int len = 0;
+    for (String s: strings) {
+      byte[] tmp = s.getBytes();
+      System.arraycopy(tmp, 0, uncompressedData, len, s.length());
+      len += s.length();
+    }
+
+    assert(compressor.needsInput());
+    compressor.setInput(uncompressedData, 0, len);
+    assert(compressor.needsInput());
+    compressor.finish();
+    assert(!compressor.needsInput());
+    assert(!compressor.finished() || uncompressedSize == 0);
+    byte[] compressedData = new byte[1000];
+
+    int compressedSize = compressor.compress(compressedData, 0, 1000);
+    assert(compressor.finished());
+
+    assert(!decompressor.finished());
+    assert(decompressor.needsInput());
+    decompressor.setInput(compressedData, 0, compressedSize);
+    assert(!decompressor.finished());
+    byte[] decompressedData = new byte[uncompressedSize];
+    int decompressedSize = decompressor.decompress(decompressedData, 0, uncompressedSize);
+    assert(decompressor.finished());
+
+    assertEquals(uncompressedSize, decompressedSize);
+    assertArrayEquals(uncompressedData, decompressedData);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestUtils.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestUtils.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestUtils.java
new file mode 100644
index 0000000..fcfd517
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestUtils.java
@@ -0,0 +1,40 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class TestUtils {
+
+  public static void enforceEmptyDir(Configuration conf, Path path) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    if (fs.exists(path)) {
+      if (!fs.delete(path, true)) {
+        throw new IOException("can not delete path " + path);
+      }
+    }
+    if (!fs.mkdirs(path)) {
+      throw new IOException("can not create path " + path);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/CodecConfigTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/CodecConfigTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/CodecConfigTest.java
new file mode 100644
index 0000000..57200b2
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/CodecConfigTest.java
@@ -0,0 +1,77 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.junit.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.*;
+import org.junit.Test;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.IOException;
+import org.apache.parquet.hadoop.util.ContextUtil;
+
+public class CodecConfigTest {
+  @Test
+  public void testReadingCodecs() throws IOException {
+    shouldUseParquetFlagToSetCodec("gzip", CompressionCodecName.GZIP);
+    shouldUseHadoopFlagToSetCodec(CompressionCodecName.GZIP.getHadoopCompressionCodecClassName(), CompressionCodecName.GZIP);
+    shouldUseParquetFlagToSetCodec("snappy", CompressionCodecName.SNAPPY);
+    shouldUseHadoopFlagToSetCodec(CompressionCodecName.SNAPPY.getHadoopCompressionCodecClassName(), CompressionCodecName.SNAPPY);
+    //When codec is unrecognized, use uncompressed
+    shouldUseHadoopFlagToSetCodec("unexistedCodec", CompressionCodecName.UNCOMPRESSED);
+    //For unsupported codec, use uncompressed
+    shouldUseHadoopFlagToSetCodec("org.apache.hadoop.io.compress.DefaultCodec", CompressionCodecName.UNCOMPRESSED);
+  }
+
+  public void shouldUseParquetFlagToSetCodec(String codecNameStr, CompressionCodecName expectedCodec) throws IOException {
+
+    //Test mapreduce API
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    conf.set(ParquetOutputFormat.COMPRESSION, codecNameStr);
+    TaskAttemptContext task = ContextUtil.newTaskAttemptContext(conf, new TaskAttemptID(new TaskID(new JobID("test", 1), false, 1), 1));
+    Assert.assertEquals(CodecConfig.from(task).getCodec(), expectedCodec);
+
+    //Test mapred API
+    JobConf jobConf = new JobConf();
+    jobConf.set(ParquetOutputFormat.COMPRESSION, codecNameStr);
+    Assert.assertEquals(CodecConfig.from(jobConf).getCodec(), expectedCodec);
+  }
+
+  public void shouldUseHadoopFlagToSetCodec(String codecClassStr, CompressionCodecName expectedCodec) throws IOException {
+    //Test mapreduce API
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    conf.setBoolean("mapred.output.compress", true);
+    conf.set("mapred.output.compression.codec", codecClassStr);
+    TaskAttemptContext task = ContextUtil.newTaskAttemptContext(conf, new TaskAttemptID(new TaskID(new JobID("test", 1), false, 1), 1));
+    Assert.assertEquals(expectedCodec, CodecConfig.from(task).getCodec());
+
+    //Test mapred API
+    JobConf jobConf = new JobConf();
+    jobConf.setBoolean("mapred.output.compress", true);
+    jobConf.set("mapred.output.compression.codec", codecClassStr);
+    Assert.assertEquals(CodecConfig.from(jobConf).getCodec(), expectedCodec);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/GroupReadSupportTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/GroupReadSupportTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/GroupReadSupportTest.java
new file mode 100644
index 0000000..2a99a1b
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/GroupReadSupportTest.java
@@ -0,0 +1,65 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.example;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import java.util.HashMap;
+import java.util.Map;
+import static org.junit.Assert.assertEquals;
+
+public class GroupReadSupportTest {
+
+  private String fullSchemaStr = "message example {\n" +
+          "required int32 line;\n" +
+          "optional binary content;\n" +
+          "}";
+
+  private String partialSchemaStr = "message example {\n" +
+          "required int32 line;\n" +
+          "}";
+
+
+  @Test
+  public void testInitWithoutSpecifyingRequestSchema() throws Exception {
+    GroupReadSupport s = new GroupReadSupport();
+    Configuration configuration = new Configuration();
+    Map<String, String> keyValueMetaData = new HashMap<String, String>();
+    MessageType fileSchema = MessageTypeParser.parseMessageType(fullSchemaStr);
+
+    ReadSupport.ReadContext context = s.init(configuration, keyValueMetaData, fileSchema);
+    assertEquals(context.getRequestedSchema(), fileSchema);
+  }
+
+  @Test
+  public void testInitWithPartialSchema() {
+    GroupReadSupport s = new GroupReadSupport();
+    Configuration configuration = new Configuration();
+    Map<String, String> keyValueMetaData = new HashMap<String, String>();
+    MessageType fileSchema = MessageTypeParser.parseMessageType(fullSchemaStr);
+    MessageType partialSchema = MessageTypeParser.parseMessageType(partialSchemaStr);
+    configuration.set(ReadSupport.PARQUET_READ_SCHEMA, partialSchemaStr);
+
+    ReadSupport.ReadContext context = s.init(configuration, keyValueMetaData, fileSchema);
+    assertEquals(context.getRequestedSchema(), partialSchema);
+  }
+}