You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ti...@apache.org on 2014/09/05 20:33:07 UTC

[1/2] PARQUET-84: Avoid reading rowgroup metadata in memory on the client side.

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 647b8a70f -> 5dafd127f


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
index 67c7dd7..6b89e37 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
@@ -15,9 +15,18 @@
  */
 package parquet.hadoop;
 
+import static parquet.format.converter.ParquetMetadataConverter.range;
+
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -27,10 +36,14 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import parquet.Log;
 import parquet.filter.UnboundRecordFilter;
 import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.RowGroupFilter;
 import parquet.filter2.compat.FilterCompat.Filter;
 import parquet.hadoop.api.ReadSupport;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.hadoop.util.ContextUtil;
 import parquet.hadoop.util.counters.BenchmarkCounter;
+import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 
 /**
@@ -44,7 +57,7 @@ import parquet.schema.MessageTypeParser;
  */
 public class ParquetRecordReader<T> extends RecordReader<Void, T> {
 
-  private static final Log LOG= Log.getLog(ParquetRecordReader.class);
+  private static final Log LOG = Log.getLog(ParquetRecordReader.class);
   private final InternalParquetRecordReader<T> internalReader;
 
   /**
@@ -113,9 +126,9 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
       throws IOException, InterruptedException {
     if (context instanceof TaskInputOutputContext<?, ?, ?, ?>) {
       BenchmarkCounter.initCounterFromContext((TaskInputOutputContext<?, ?, ?, ?>) context);
-    }else{
+    } else {
       LOG.error("Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is "
-              +context.getClass().getCanonicalName());
+              + context.getClass().getCanonicalName());
     }
 
     initializeInternalReader((ParquetInputSplit)inputSplit, ContextUtil.getConfiguration(context));
@@ -128,12 +141,53 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
   }
 
   private void initializeInternalReader(ParquetInputSplit split, Configuration configuration) throws IOException {
-
+    Path path = split.getPath();
+    ParquetMetadata footer = ParquetFileReader.readFooter(
+        configuration, path, range(split.getStart(), split.getEnd()));
+    long[] rowGroupOffsets = split.getRowGroupOffsets();
+    List<BlockMetaData> filteredBlocks;
+    // if task.side.metadata is set, rowGroupOffsets is null
+    MessageType fileSchema = footer.getFileMetaData().getSchema();
+    if (rowGroupOffsets == null) {
+      // then we need to apply the predicate push down filter
+      Filter filter = ParquetInputFormat.getFilter(configuration);
+      filteredBlocks = RowGroupFilter.filterRowGroups(filter, footer.getBlocks(), fileSchema);
+    } else {
+      // otherwise we find the row groups that were selected on the client
+      Set<Long> offsets = new HashSet<Long>();
+      for (long offset : rowGroupOffsets) {
+        offsets.add(offset);
+      }
+      filteredBlocks = new ArrayList<BlockMetaData>();
+      for (BlockMetaData block : footer.getBlocks()) {
+        if (offsets.contains(block.getStartingPos())) {
+          filteredBlocks.add(block);
+        }
+      }
+      // verify we found them all
+      if (filteredBlocks.size() != rowGroupOffsets.length) {
+        long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
+        for (int i = 0; i < foundRowGroupOffsets.length; i++) {
+          foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
+        }
+        // this should never happen.
+        // provide a good error message in case there's a bug
+        throw new IllegalStateException(
+            "All the offsets listed in the split should be found in the file."
+            + " expected: " + Arrays.toString(rowGroupOffsets)
+            + " found: " + filteredBlocks
+            + " out of: " + Arrays.toString(foundRowGroupOffsets)
+            + " in range " + split.getStart() + ", " + split.getEnd());
+      }
+    }
+    MessageType requestedSchema = MessageTypeParser.parseMessageType(split.getRequestedSchema());
+    Map<String, String> fileMetaData = footer.getFileMetaData().getKeyValueMetaData();
+    Map<String, String> readSupportMetadata = split.getReadSupportMetadata();
     internalReader.initialize(
-        MessageTypeParser.parseMessageType(split.getRequestedSchema()),
-        MessageTypeParser.parseMessageType(split.getFileSchema()),
-        split.getExtraMetadata(), split.getReadSupportMetadata(), split.getPath(),
-        split.getBlocks(), configuration);
+        requestedSchema,fileSchema,
+        fileMetaData, readSupportMetadata,
+        path,
+        filteredBlocks, configuration);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/PrintFooter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/PrintFooter.java b/parquet-hadoop/src/main/java/parquet/hadoop/PrintFooter.java
index 68a2fa5..500b016 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/PrintFooter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/PrintFooter.java
@@ -15,6 +15,7 @@
  */
 package parquet.hadoop;
 
+import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
 
 import java.net.URI;
@@ -99,7 +100,7 @@ public class PrintFooter {
             @Override
             public ParquetMetadata call() throws Exception {
               try {
-                ParquetMetadata footer = ParquetFileReader.readFooter(configuration, currentFile);
+                ParquetMetadata footer = ParquetFileReader.readFooter(configuration, currentFile, NO_FILTER);
                 return footer;
               } catch (Exception e) {
                 throw new ParquetDecodingException("could not read footer", e);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
index 9544865..11c7c38 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
@@ -15,10 +15,11 @@
  */
 package parquet.hadoop.mapred;
 
+import static java.util.Arrays.asList;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.mapred.InputSplit;
@@ -31,7 +32,6 @@ import parquet.hadoop.ParquetInputFormat;
 import parquet.hadoop.ParquetInputSplit;
 import parquet.hadoop.ParquetRecordReader;
 
-@SuppressWarnings("deprecation")
 public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.FileInputFormat<Void, Container<V>> {
 
   protected ParquetInputFormat<V> realInputFormat = new ParquetInputFormat<V>();
@@ -46,22 +46,19 @@ public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.Fi
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     List<Footer> footers = getFooters(job);
     List<ParquetInputSplit> splits = realInputFormat.getSplits(job, footers);
-
-      if (splits == null) {
-        return null;
-      }
-
-      InputSplit[] resultSplits = new InputSplit[splits.size()];
-      int i = 0;
-      for (ParquetInputSplit split : splits) {
-          resultSplits[i++] = new ParquetInputSplitWrapper(split);
-      }
-
-      return resultSplits;
+    if (splits == null) {
+      return null;
+    }
+    InputSplit[] resultSplits = new InputSplit[splits.size()];
+    int i = 0;
+    for (ParquetInputSplit split : splits) {
+      resultSplits[i++] = new ParquetInputSplitWrapper(split);
+    }
+    return resultSplits;
   }
 
   public List<Footer> getFooters(JobConf job) throws IOException {
-    return realInputFormat.getFooters(job, Arrays.asList(super.listStatus(job)));
+    return realInputFormat.getFooters(job, asList(super.listStatus(job)));
   }
 
   private static class RecordReaderWrapper<V> implements RecordReader<Void, Container<V>> {
@@ -157,13 +154,10 @@ public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.Fi
     }
   }
 
-
-
   private static class ParquetInputSplitWrapper implements InputSplit {
 
     ParquetInputSplit realSplit;
 
-
     @SuppressWarnings("unused") // MapReduce instantiates this.
     public ParquetInputSplitWrapper() {}
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
index 69bf36b..cbc9318 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
@@ -15,12 +15,6 @@
  */
 package parquet.hadoop.mapred;
 
-import parquet.Log;
-import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.ParquetRecordWriter;
-import parquet.hadoop.codec.CodecConfig;
-import parquet.hadoop.metadata.CompressionCodecName;
-
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -31,9 +25,12 @@ import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
 
-@SuppressWarnings("deprecation")
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.ParquetRecordWriter;
+import parquet.hadoop.codec.CodecConfig;
+import parquet.hadoop.metadata.CompressionCodecName;
+
 public class DeprecatedParquetOutputFormat<V> extends org.apache.hadoop.mapred.FileOutputFormat<Void, V> {
-  private static final Log LOG = Log.getLog(DeprecatedParquetOutputFormat.class);
 
   public static void setWriteSupportClass(Configuration configuration,  Class<?> writeSupportClass) {
     configuration.set(ParquetOutputFormat.WRITE_SUPPORT_CLASS, writeSupportClass.getName());
@@ -69,10 +66,10 @@ public class DeprecatedParquetOutputFormat<V> extends org.apache.hadoop.mapred.F
   @Override
   public RecordWriter<Void, V> getRecordWriter(FileSystem fs,
       JobConf conf, String name, Progressable progress) throws IOException {
-    return new RecordWriterWrapper<V>(realOutputFormat, fs, conf, name, progress);
+    return new RecordWriterWrapper(realOutputFormat, fs, conf, name, progress);
   }
 
-  private class RecordWriterWrapper<V> implements RecordWriter<Void, V> {
+  private class RecordWriterWrapper implements RecordWriter<Void, V> {
 
     private ParquetRecordWriter<V> realWriter;
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java
index 7cb6d73..6d5979a 100644
--- a/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java
+++ b/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java
@@ -15,25 +15,40 @@
  */
 package parquet.format.converter;
 
+import static java.util.Collections.emptyList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static parquet.format.CompressionCodec.UNCOMPRESSED;
+import static parquet.format.Type.INT32;
 import static parquet.format.Util.readPageHeader;
 import static parquet.format.Util.writePageHeader;
+import static parquet.format.converter.ParquetMetadataConverter.filterFileMetaData;
+import static parquet.format.converter.ParquetMetadataConverter.getOffset;
 
-import com.google.common.collect.Lists;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import parquet.column.Encoding;
 import parquet.example.Paper;
+import parquet.format.ColumnChunk;
+import parquet.format.ColumnMetaData;
 import parquet.format.ConvertedType;
 import parquet.format.FieldRepetitionType;
+import parquet.format.FileMetaData;
 import parquet.format.PageHeader;
 import parquet.format.PageType;
+import parquet.format.RowGroup;
 import parquet.format.SchemaElement;
 import parquet.format.Type;
 import parquet.schema.MessageType;
@@ -42,6 +57,8 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName;
 import parquet.schema.Type.Repetition;
 import parquet.schema.Types;
 
+import com.google.common.collect.Lists;
+
 public class TestParquetMetadataConverter {
 
   @Test
@@ -117,4 +134,113 @@ public class TestParquetMetadataConverter {
     }
   }
 
+  private FileMetaData metadata(long... sizes) {
+    List<SchemaElement> schema = emptyList();
+    List<RowGroup> rowGroups = new ArrayList<RowGroup>();
+    long offset = 0;
+    for (long size : sizes) {
+      ColumnChunk columnChunk = new ColumnChunk(offset);
+      columnChunk.setMeta_data(new ColumnMetaData(
+          INT32,
+          Collections.<parquet.format.Encoding>emptyList(),
+          Collections.<String>emptyList(),
+          UNCOMPRESSED, 10l, size * 2, size, offset));
+      rowGroups.add(new RowGroup(Arrays.asList(columnChunk), size, 1));
+      offset += size;
+    }
+    return new FileMetaData(1, schema, sizes.length, rowGroups);
+  }
+
+  private FileMetaData filter(FileMetaData md, long start, long end) {
+    return filterFileMetaData(new FileMetaData(md), new ParquetMetadataConverter.RangeMetadataFilter(start, end));
+  }
+
+  private void verifyMD(FileMetaData md, long... offsets) {
+    assertEquals(offsets.length, md.row_groups.size());
+    for (int i = 0; i < offsets.length; i++) {
+      long offset = offsets[i];
+      RowGroup rowGroup = md.getRow_groups().get(i);
+      assertEquals(offset, getOffset(rowGroup));
+    }
+  }
+
+  /**
+   * verifies that splits will end up being a partition of the rowgroup
+   * they are all found only once
+   * @param md
+   * @param splitWidth
+   */
+  private void verifyAllFilters(FileMetaData md, long splitWidth) {
+    Set<Long> offsetsFound = new TreeSet<Long>();
+    for (long start = 0; start < fileSize(md); start += splitWidth) {
+      FileMetaData filtered = filter(md, start, start + splitWidth);
+      for (RowGroup rg : filtered.getRow_groups()) {
+        long o = getOffset(rg);
+        if (offsetsFound.contains(o)) {
+          fail("found the offset twice: " + o);
+        } else {
+          offsetsFound.add(o);
+        }
+      }
+    }
+    if (offsetsFound.size() != md.row_groups.size()) {
+      fail("missing row groups, "
+          + "found: " + offsetsFound
+          + "\nexpected " + md.getRow_groups());
+    }
+  }
+
+  private long fileSize(FileMetaData md) {
+    long size = 0;
+    for (RowGroup rg : md.getRow_groups()) {
+      size += rg.total_byte_size;
+    }
+    return size;
+  }
+
+  @Test
+  public void testFilterMetaData() {
+    verifyMD(filter(metadata(50, 50, 50), 0, 50), 0);
+    verifyMD(filter(metadata(50, 50, 50), 50, 100), 50);
+    verifyMD(filter(metadata(50, 50, 50), 100, 150), 100);
+    // picks up first RG
+    verifyMD(filter(metadata(50, 50, 50), 25, 75), 0);
+    // picks up no RG
+    verifyMD(filter(metadata(50, 50, 50), 26, 75));
+    // picks up second RG
+    verifyMD(filter(metadata(50, 50, 50), 26, 76), 50);
+
+    verifyAllFilters(metadata(50, 50, 50), 10);
+    verifyAllFilters(metadata(50, 50, 50), 51);
+    verifyAllFilters(metadata(50, 50, 50), 25); // corner cases are in the middle
+    verifyAllFilters(metadata(50, 50, 50), 24);
+    verifyAllFilters(metadata(50, 50, 50), 26);
+    verifyAllFilters(metadata(50, 50, 50), 110);
+    verifyAllFilters(metadata(10, 50, 500), 110);
+    verifyAllFilters(metadata(10, 50, 500), 10);
+    verifyAllFilters(metadata(10, 50, 500), 600);
+    verifyAllFilters(metadata(11, 9, 10), 10);
+    verifyAllFilters(metadata(11, 9, 10), 9);
+    verifyAllFilters(metadata(11, 9, 10), 8);
+  }
+
+  @Test
+  public void randomTestFilterMetaData() {
+    // randomized property based testing
+    // if it fails add the case above
+    Random random = new Random(System.currentTimeMillis());
+    for (int j = 0; j < 100; j++) {
+      long[] rgs = new long[random.nextInt(50)];
+      for (int i = 0; i < rgs.length; i++) {
+        rgs[i] = random.nextInt(10000) + 1; // No empty row groups
+      }
+      int splitSize = random.nextInt(10000);
+      try {
+        verifyAllFilters(metadata(rgs), splitSize);
+      } catch (AssertionError e) {
+        throw new AssertionError("fail verifyAllFilters(metadata(" + Arrays.toString(rgs) + "), " + splitSize + ")", e);
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java b/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java
index 696c7b3..936c7c7 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java
@@ -110,7 +110,7 @@ public class DeprecatedInputFormatTest {
   }
 
   @Test
-  public void testReadWriteWithCounteDeprecated() throws Exception {
+  public void testReadWriteWithCountDeprecated() throws Exception {
     runMapReduceJob(CompressionCodecName.GZIP);
     assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytesread").getValue() > 0L);
     assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytestotal").getValue() > 0L);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
index 1823716..bac4fb0 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
@@ -15,13 +15,28 @@
  */
 package parquet.hadoop;
 
+import static java.util.Collections.unmodifiableMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -54,18 +69,6 @@ import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static parquet.filter2.predicate.FilterApi.and;
-import static parquet.filter2.predicate.FilterApi.eq;
-import static parquet.filter2.predicate.FilterApi.intColumn;
-import static parquet.filter2.predicate.FilterApi.not;
-import static parquet.filter2.predicate.FilterApi.notEq;
-import static parquet.filter2.predicate.FilterApi.or;
-
 public class TestInputFormat {
 
   List<BlockMetaData> blocks;
@@ -86,7 +89,6 @@ public class TestInputFormat {
     for (int i = 0; i < 10; i++) {
       blocks.add(newBlock(i * 10, 10));
     }
-    fileStatus = new FileStatus(100, false, 2, 50, 0, new Path("hdfs://foo.namenode:1234/bar"));
     schema = MessageTypeParser.parseMessageType("message doc { required binary foo; }");
     fileMetaData = new FileMetaData(schema, new HashMap<String, String>(), "parquet-mr");
   }
@@ -103,6 +105,17 @@ public class TestInputFormat {
   }
 
   @Test
+  public void testThrowExceptionWhenMaxSplitSizeIsSmallerThanMinSplitSizeTaskSide() throws IOException {
+    try {
+      generateTSSplitByMinMaxSize(50, 49);
+      fail("should throw exception when max split size is smaller than the min split size");
+    } catch (ParquetDecodingException e) {
+      assertEquals("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = 49; minSplitSize is 50"
+              , e.getMessage());
+    }
+  }
+
+  @Test
   public void testThrowExceptionWhenMaxSplitSizeIsNegative() throws IOException {
     try {
       generateSplitByMinMaxSize(-100, -50);
@@ -114,6 +127,17 @@ public class TestInputFormat {
   }
 
   @Test
+  public void testTSThrowExceptionWhenMaxSplitSizeIsNegative() throws IOException {
+    try {
+      generateTSSplitByMinMaxSize(-100, -50);
+      fail("should throw exception when max split size is negative");
+    } catch (ParquetDecodingException e) {
+      assertEquals("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = -50; minSplitSize is -100"
+              , e.getMessage());
+    }
+  }
+
+  @Test
   public void testGetFilter() throws IOException {
     IntColumn intColumn = intColumn("foo");
     FilterPredicate p = or(eq(intColumn, 7), eq(intColumn, 12));
@@ -151,6 +175,21 @@ public class TestInputFormat {
   }
 
   @Test
+  public void testTSGenerateSplitsAlignedWithHDFSBlock() throws IOException {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(50, 50);
+    shouldSplitStartBe(splits, 0, 50);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 50, 50);
+
+    splits = generateSplitByMinMaxSize(0, Long.MAX_VALUE);
+    shouldSplitStartBe(splits, 0, 50);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 50, 50);
+
+  }
+
+  @Test
   public void testRowGroupNotAlignToHDFSBlock() throws IOException {
     //Test HDFS blocks size(51) is not multiple of row group size(10)
     withHDFSBlockSize(51, 51);
@@ -179,6 +218,32 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 40, 50, 10);
   }
 
+  @Test
+  public void testTSRowGroupNotAlignToHDFSBlock() throws IOException {
+    //Test HDFS blocks size(51) is not multiple of row group size(10)
+    withHDFSBlockSize(51, 51);
+    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(50, 50);
+    shouldSplitStartBe(splits, 0, 50, 100);
+    shouldSplitLocationBe(splits, 0, 1, 1);
+    shouldSplitLengthBe(splits, 50, 50, 2);
+
+    //Test a rowgroup is greater than the hdfsBlock boundary
+    withHDFSBlockSize(49, 49);
+    splits = generateTSSplitByMinMaxSize(50, 50);
+    shouldSplitStartBe(splits, 0, 50);
+    shouldSplitLocationBe(splits, 1, 1);
+    shouldSplitLengthBe(splits, 50, 48);
+
+    /*
+    aaaa bbbbb c
+     */
+    withHDFSBlockSize(44,44,44);
+    splits = generateTSSplitByMinMaxSize(40, 50);
+    shouldSplitStartBe(splits, 0, 44, 88);
+    shouldSplitLocationBe(splits, 0, 1, 2);
+    shouldSplitLengthBe(splits, 44, 44, 44);
+  }
+
   /*
     when min size is 55, max size is 56, the first split will be generated with 6 row groups(size of 10 each), which satisfies split.size>min.size, but not split.size<max.size
     aaaaa abbbb
@@ -205,6 +270,28 @@ public class TestInputFormat {
 
   }
 
+  @Test
+  public void testTSGenerateSplitsNotAlignedWithHDFSBlock() throws IOException, InterruptedException {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(55, 56);
+    shouldSplitStartBe(splits, 0, 56);
+    shouldSplitLocationBe(splits, 1, 1);
+    shouldSplitLengthBe(splits, 56, 44);
+
+    withHDFSBlockSize(51, 51);
+    splits = generateTSSplitByMinMaxSize(55, 56);
+    shouldSplitStartBe(splits, 0, 56);
+    shouldSplitLocationBe(splits, 1, 1);
+    shouldSplitLengthBe(splits, 56, 46);
+
+    withHDFSBlockSize(49, 49, 49);
+    splits = generateTSSplitByMinMaxSize(55, 56);
+    shouldSplitStartBe(splits, 0, 56, 112);
+    shouldSplitLocationBe(splits, 1, 2, 2);
+    shouldSplitLengthBe(splits, 56, 56, 35);
+
+  }
+
   /*
     when the max size is set to be 30, first split will be of size 30,
     and when creating second split, it will try to align it to second hdfsBlock, and therefore generates a split of size 20
@@ -237,6 +324,33 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 30, 20, 30, 20);
   }
 
+  @Test
+  public void testTSGenerateSplitsSmallerThanMaxSizeAndAlignToHDFS() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(18, 30);
+    shouldSplitStartBe(splits, 0, 30, 50, 80);
+    shouldSplitLocationBe(splits, 0, 0, 1, 1);
+    shouldSplitLengthBe(splits, 30, 20, 30, 20);
+
+    /*
+    aaabb cccdd
+         */
+    withHDFSBlockSize(51, 51);
+    splits = generateTSSplitByMinMaxSize(18, 30);
+    shouldSplitStartBe(splits, 0, 30, 51, 81);
+    shouldSplitLocationBe(splits, 0, 0, 1, 1);
+    shouldSplitLengthBe(splits, 30, 21, 30, 21);
+
+    /*
+    aaabb cccdd
+     */
+    withHDFSBlockSize(49, 49, 49);
+    splits = generateTSSplitByMinMaxSize(18, 30);
+    shouldSplitStartBe(splits, 0, 30, 49, 79, 98, 128);
+    shouldSplitLocationBe(splits, 0, 0, 1, 1, 2, 2);
+    shouldSplitLengthBe(splits, 30, 19, 30, 19, 30, 19);
+  }
+
   /*
     when the min size is set to be 25, so the second split can not be aligned with the boundary of hdfs block, there for split of size 30 will be created as the 3rd split.
     aaabb bcccd
@@ -250,6 +364,15 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 30, 30, 30, 10);
   }
 
+  @Test
+  public void testTSGenerateSplitsCrossHDFSBlockBoundaryToSatisfyMinSize() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(25, 30);
+    shouldSplitStartBe(splits, 0, 30, 60, 90);
+    shouldSplitLocationBe(splits, 0, 1, 1, 1);
+    shouldSplitLengthBe(splits, 30, 30, 30, 10);
+  }
+
   /*
     when rowGroups size is 10, but min split size is 10, max split size is 18, it will create splits of size 20 and of size 10 and align with hdfsBlocks
     aabbc ddeef
@@ -285,6 +408,37 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 20, 20, 10, 20, 20, 10);
   }
 
+  @Test
+  public void testTSMultipleRowGroupsInABlockToAlignHDFSBlock() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(10, 18);
+    shouldSplitStartBe(splits, 0, 18, 36, 50, 68, 86);
+    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
+    shouldSplitLengthBe(splits, 18, 18, 14, 18, 18, 14);
+
+    /*
+    aabbc ddeef
+    notice the first byte of split d is in the first hdfs block:
+    when adding the 6th row group, although the first byte of it is in the first hdfs block
+    , but the mid point of the row group is in the second hdfs block, there for a new split(d) is created including that row group
+     */
+    withHDFSBlockSize(51, 51);
+    splits = generateTSSplitByMinMaxSize(10, 18);
+    shouldSplitStartBe(splits, 0, 18, 36, 51, 69, 87);
+    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
+    shouldSplitLengthBe(splits, 18, 18, 15, 18, 18, 15);
+
+    /*
+    aabbc ddeef
+    same as the case where block sizes are 50 50
+     */
+    withHDFSBlockSize(49, 49);
+    splits = generateTSSplitByMinMaxSize(10, 18);
+    shouldSplitStartBe(splits, 0, 18, 36, 49, 67, 85);
+    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
+    shouldSplitLengthBe(splits, 18, 18, 13, 18, 18, 13);
+  }
+
   public static final class DummyUnboundRecordFilter implements UnboundRecordFilter {
     @Override
     public RecordFilter bind(Iterable<ColumnReader> readers) {
@@ -379,33 +533,63 @@ public class TestInputFormat {
     return cacheValue;
   }
 
+  private static final Map<String, String> extramd;
+  static {
+    Map<String, String> md = new HashMap<String, String>();
+    md.put("specific", "foo");
+    extramd = unmodifiableMap(md);
+  }
+
   private List<ParquetInputSplit> generateSplitByMinMaxSize(long min, long max) throws IOException {
-    return ParquetInputFormat.generateSplits(
-            blocks, hdfsBlocks, fileStatus, fileMetaData, schema.toString(), new HashMap<String, String>() {{
-              put("specific", "foo");
-            }}, min, max
-    );
+    return ClientSideMetadataSplitStrategy.generateSplits(
+        blocks, hdfsBlocks,
+        fileStatus,
+        schema.toString(),
+        extramd,
+        min, max);
+  }
+
+  private List<ParquetInputSplit> generateTSSplitByMinMaxSize(long min, long max) throws IOException {
+    return TaskSideMetadataSplitStrategy.generateTaskSideMDSplits(
+        hdfsBlocks,
+        fileStatus,
+        schema.toString(),
+        extramd,
+        min, max);
+  }
+
+  private void shouldSplitStartBe(List<ParquetInputSplit> splits, long... offsets) {
+    assertEquals(message(splits), offsets.length, splits.size());
+    for (int i = 0; i < offsets.length; i++) {
+      assertEquals(message(splits) + i, offsets[i], splits.get(i).getStart());
+    }
   }
 
   private void shouldSplitBlockSizeBe(List<ParquetInputSplit> splits, int... sizes) {
-    assertEquals(sizes.length, splits.size());
+    assertEquals(message(splits), sizes.length, splits.size());
     for (int i = 0; i < sizes.length; i++) {
-      assertEquals(sizes[i], splits.get(i).getBlocks().size());
-      assertEquals("foo", splits.get(i).getReadSupportMetadata().get("specific"));
+      assertEquals(message(splits) + i, sizes[i], splits.get(i).getRowGroupOffsets().length);
     }
   }
 
   private void shouldSplitLocationBe(List<ParquetInputSplit> splits, int... locations) throws IOException {
-    assertEquals(locations.length, splits.size());
+    assertEquals(message(splits), locations.length, splits.size());
     for (int i = 0; i < locations.length; i++) {
-      assertEquals("[foo" + locations[i] + ".datanode, bar" + locations[i] + ".datanode]", Arrays.toString(splits.get(i).getLocations()));
+      int loc = locations[i];
+      ParquetInputSplit split = splits.get(i);
+      assertEquals(message(splits) + i, "foo", split.getReadSupportMetadata().get("specific"));
+      assertEquals(message(splits) + i, "[foo" + loc + ".datanode, bar" + loc + ".datanode]", Arrays.toString(split.getLocations()));
     }
   }
 
+  private String message(List<ParquetInputSplit> splits) {
+    return String.valueOf(splits) + " " + Arrays.toString(hdfsBlocks) + "\n";
+  }
+
   private void shouldSplitLengthBe(List<ParquetInputSplit> splits, int... lengths) {
-    assertEquals(lengths.length, splits.size());
+    assertEquals(message(splits), lengths.length, splits.size());
     for (int i = 0; i < lengths.length; i++) {
-      assertEquals(lengths[i], splits.get(i).getLength());
+      assertEquals(message(splits) + i, lengths[i], splits.get(i).getLength());
     }
   }
 
@@ -417,6 +601,7 @@ public class TestInputFormat {
       hdfsBlocks[i] = new BlockLocation(new String[0], new String[]{"foo" + i + ".datanode", "bar" + i + ".datanode"}, offset, blockSize);
       offset += blockSize;
     }
+    fileStatus = new FileStatus(offset, false, 2, 50, 0, new Path("hdfs://foo.namenode:1234/bar"));
   }
 
   private BlockMetaData newBlock(long start, long compressedBlockSize) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
index df593b0..908d2b1 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java
@@ -278,16 +278,17 @@ public class TestParquetFileWriter {
     createFile(configuration, new Path(testDirPath, "part2"), schema);
 
     FileStatus outputStatus = fs.getFileStatus(testDirPath);
-    List<Footer> footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus);
+    List<Footer> footers = ParquetFileReader.readFooters(configuration, outputStatus, false);
     validateFooters(footers);
     ParquetFileWriter.writeMetadataFile(configuration, testDirPath, footers);
 
-    footers = ParquetFileReader.readFooters(configuration, outputStatus);
+    footers = ParquetFileReader.readFooters(configuration, outputStatus, false);
     validateFooters(footers);
-    footers = ParquetFileReader.readFooters(configuration, fs.getFileStatus(new Path(testDirPath, "part0")));
+    footers = ParquetFileReader.readFooters(configuration, fs.getFileStatus(new Path(testDirPath, "part0")), false);
     assertEquals(1, footers.size());
 
     final FileStatus metadataFile = fs.getFileStatus(new Path(testDirPath, ParquetFileWriter.PARQUET_METADATA_FILE));
+    final FileStatus metadataFileLight = fs.getFileStatus(new Path(testDirPath, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE));
     final List<Footer> metadata = ParquetFileReader.readSummaryFile(configuration, metadataFile);
 
     validateFooters(metadata);
@@ -297,19 +298,20 @@ public class TestParquetFileWriter {
       public boolean accept(Path p) {
         return !p.getName().startsWith("_");
       }
-    })));
+    })), false);
     validateFooters(footers);
 
     fs.delete(metadataFile.getPath(), false);
+    fs.delete(metadataFileLight.getPath(), false);
 
-    footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, Arrays.asList(fs.listStatus(testDirPath)));
+    footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, Arrays.asList(fs.listStatus(testDirPath)), false);
     validateFooters(footers);
 
   }
 
   private void validateFooters(final List<Footer> metadata) {
     LOG.debug(metadata);
-    assertEquals(3, metadata.size());
+    assertEquals(String.valueOf(metadata), 3, metadata.size());
     for (Footer footer : metadata) {
       final File file = new File(footer.getFile().toUri());
       assertTrue(file.getName(), file.getName().startsWith("part"));

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetInputSplit.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetInputSplit.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetInputSplit.java
deleted file mode 100644
index f94b90a..0000000
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetInputSplit.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.hadoop;
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-public class TestParquetInputSplit {
-
-    @Test
-    public void testStringCompression() {
-      String[] strings = {"this is a string",
-         "this is a string with a \n newline",
-         "a:chararray, b:{t:(c:chararray, d:chararray)}",
-         "message pig_schema {\n" +
-         "  optional binary a;\n" +
-         "  optional group b {\n" +
-         "    repeated group t {\n" +
-         "      optional binary c;\n" +
-         "      optional binary d;\n" +
-         "    }\n" +
-         "  }\n" +
-         "}\n"
-         };
-      ParquetInputSplit split = new ParquetInputSplit();
-      for (String s : strings) {
-        byte[] bytes = split.compressString(s);
-        String uncs = split.decompressString(bytes);
-        assertEquals("strings should be same after decompressing", s, uncs);
-      }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java b/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
index a541a9f..e65f03c 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
@@ -24,6 +24,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -152,9 +153,14 @@ public class TestInputOutputFormat {
       context.write(new LongWritable(value.getInteger("line", 0)), new Text("dummy"));
     }
   }
-
   private void runMapReduceJob(CompressionCodecName codec) throws IOException, ClassNotFoundException, InterruptedException {
-
+    runMapReduceJob(codec, Collections.<String, String>emptyMap());
+  }
+  private void runMapReduceJob(CompressionCodecName codec, Map<String, String> extraConf) throws IOException, ClassNotFoundException, InterruptedException {
+    Configuration conf = new Configuration(this.conf);
+    for (Map.Entry<String, String> entry : extraConf.entrySet()) {
+      conf.set(entry.getKey(), entry.getValue());
+    }
     final FileSystem fileSystem = parquetPath.getFileSystem(conf);
     fileSystem.delete(parquetPath, true);
     fileSystem.delete(outputPath, true);
@@ -193,7 +199,10 @@ public class TestInputOutputFormat {
   }
 
   private void testReadWrite(CompressionCodecName codec) throws IOException, ClassNotFoundException, InterruptedException {
-    runMapReduceJob(codec);
+    testReadWrite(codec, Collections.<String, String>emptyMap());
+  }
+  private void testReadWrite(CompressionCodecName codec, Map<String, String> conf) throws IOException, ClassNotFoundException, InterruptedException {
+    runMapReduceJob(codec, conf);
     final BufferedReader in = new BufferedReader(new FileReader(new File(inputPath.toString())));
     final BufferedReader out = new BufferedReader(new FileReader(new File(outputPath.toString(), "part-m-00000")));
     String lineIn;
@@ -204,8 +213,8 @@ public class TestInputOutputFormat {
       lineOut = lineOut.substring(lineOut.indexOf("\t") + 1);
       assertEquals("line " + lineNumber, lineIn, lineOut);
     }
-    assertNull("line " + lineNumber, lineIn);
     assertNull("line " + lineNumber, out.readLine());
+    assertNull("line " + lineNumber, lineIn);
     in.close();
     out.close();
   }
@@ -219,9 +228,14 @@ public class TestInputOutputFormat {
   }
 
   @Test
+  public void testReadWriteTaskSideMD() throws IOException, ClassNotFoundException, InterruptedException {
+    testReadWrite(CompressionCodecName.UNCOMPRESSED, new HashMap<String, String>() {{ put("parquet.task.side.metadata", "true"); }});
+  }
+
+  @Test
   public void testProjection() throws Exception{
     readSchema=partialSchema;
-    writeMapperClass=PartialWriteMapper.class;
+    writeMapperClass = PartialWriteMapper.class;
     runMapReduceJob(CompressionCodecName.GZIP);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
----------------------------------------------------------------------
diff --git a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
index 9e98781..796a0f5 100644
--- a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
+++ b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
@@ -13,9 +13,9 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.read;
 
+import static parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
+
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,7 +35,6 @@ import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.ParquetInputFormat;
 import parquet.hadoop.ParquetInputSplit;
 import parquet.hadoop.api.ReadSupport.ReadContext;
-import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.FileMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.hadoop.util.ContextUtil;
@@ -194,45 +193,33 @@ public class ParquetRecordReaderWrapper  implements RecordReader<Void, ArrayWrit
       final InputSplit oldSplit,
       final JobConf conf
       ) throws IOException {
-    ParquetInputSplit split;
     if (oldSplit instanceof FileSplit) {
-      final Path finalPath = ((FileSplit) oldSplit).getPath();
+      FileSplit fileSplit = (FileSplit) oldSplit;
+      final long splitStart = fileSplit.getStart();
+      final long splitLength = fileSplit.getLength();
+      final Path finalPath = fileSplit.getPath();
       final JobConf cloneJob = hiveBinding.pushProjectionsAndFilters(conf, finalPath.getParent());
 
-      final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath);
-      final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
+      final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath, SKIP_ROW_GROUPS);
       final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
-
-      final ReadContext readContext = new DataWritableReadSupport()
-          .init(cloneJob, fileMetaData.getKeyValueMetaData(), fileMetaData.getSchema());
-      schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
-          .get(DataWritableReadSupport.HIVE_SCHEMA_KEY)).getFieldCount();
-      final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
-      final long splitStart = ((FileSplit) oldSplit).getStart();
-      final long splitLength = ((FileSplit) oldSplit).getLength();
-      for (final BlockMetaData block : blocks) {
-        final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
-        if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) {
-          splitGroup.add(block);
-        }
-      }
-      if (splitGroup.isEmpty()) {
-        LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit);
-        split = null;
-      } else {
-        split = new ParquetInputSplit(finalPath,
+      final ReadContext readContext =
+          new DataWritableReadSupport()
+            .init(cloneJob, fileMetaData.getKeyValueMetaData(), fileMetaData.getSchema());
+
+      schemaSize = MessageTypeParser.parseMessageType(
+            readContext.getReadSupportMetadata().get(DataWritableReadSupport.HIVE_SCHEMA_KEY)
+          ).getFieldCount();
+       return new ParquetInputSplit(
+                finalPath,
                 splitStart,
+                splitStart + splitLength,
                 splitLength,
-                ((FileSplit) oldSplit).getLocations(),
-                splitGroup,
+                fileSplit.getLocations(),
+                null,
                 readContext.getRequestedSchema().toString(),
-                fileMetaData.getSchema().toString(),
-                fileMetaData.getKeyValueMetaData(),
                 readContext.getReadSupportMetadata());
-      }
     } else {
       throw new IllegalArgumentException("Unknown split type: " + oldSplit);
     }
-    return split;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java b/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
index 91dc0da..2e96ecf 100644
--- a/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
+++ b/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
@@ -98,34 +98,34 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
   public ParquetLoader(String requestedSchemaStr) {
     this(parsePigSchema(requestedSchemaStr), false);
   }
-  
+
   /**
-   * To read only a subset of the columns in the file optionally assigned by 
+   * To read only a subset of the columns in the file optionally assigned by
    * column positions.  Using column positions allows for renaming the fields
    * and is more inline with the "schema-on-read" approach to accessing file
    * data.
-   * 
-   * Example: 
+   *
+   * Example:
    * File Schema:  'c1:int, c2:float, c3:double, c4:long'
    * ParquetLoader('n1:int, n2:float, n3:double, n4:long', 'true');
-   * 
+   *
    * This will use the names provided in the requested schema and assign them
    * to column positions indicated by order.
-   * 
+   *
    * @param requestedSchemaStr a subset of the original pig schema in the file
    * @param columnIndexAccess use column index positions as opposed to name (default: false)
    */
   public ParquetLoader(String requestedSchemaStr, String columnIndexAccess) {
     this(parsePigSchema(requestedSchemaStr), Boolean.parseBoolean(columnIndexAccess));
   }
-  
+
   /**
    * Use the provided schema to access the underlying file data.
-   * 
+   *
    * The same as the string based constructor but for programmatic use.
-   * 
+   *
    * @param requestedSchema a subset of the original pig schema in the file
-   * @param columnIndexAccess  
+   * @param columnIndexAccess
    */
   public ParquetLoader(Schema requestedSchema, boolean columnIndexAccess) {
     this.requestedSchema = requestedSchema;
@@ -135,7 +135,7 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
   @Override
   public void setLocation(String location, Job job) throws IOException {
     if (DEBUG) LOG.debug("LoadFunc.setLocation(" + location + ", " + job + ")");
-    
+
     setInput(location, job);
   }
 
@@ -143,25 +143,25 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
     this.setLocationHasBeenCalled  = true;
     this.location = location;
     setInputPaths(job, location);
-    
+
     //This is prior to load because the initial value comes from the constructor
     //not file metadata or pig framework and would get overwritten in initSchema().
     if(UDFContext.getUDFContext().isFrontend()) {
       storeInUDFContext(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess));
     }
-    
+
     schema = PigSchemaConverter.parsePigSchema(getPropertyFromUDFContext(PARQUET_PIG_SCHEMA));
     requiredFieldList = PigSchemaConverter.deserializeRequiredFieldList(getPropertyFromUDFContext(PARQUET_PIG_REQUIRED_FIELDS));
     columnIndexAccess = Boolean.parseBoolean(getPropertyFromUDFContext(PARQUET_COLUMN_INDEX_ACCESS));
-    
+
     initSchema(job);
-    
+
     if(UDFContext.getUDFContext().isFrontend()) {
       //Setting for task-side loading via initSchema()
       storeInUDFContext(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
       storeInUDFContext(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
     }
-    
+
     //Used by task-side loader via TupleReadSupport
     getConfiguration(job).set(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
     getConfiguration(job).set(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
@@ -335,14 +335,14 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
   public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)
       throws FrontendException {
     this.requiredFieldList = requiredFieldList;
-    
+
     if (requiredFieldList == null)
       return null;
-    
+
     schema = getSchemaFromRequiredFieldList(schema, requiredFieldList.getFields());
     storeInUDFContext(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
     storeInUDFContext(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
-    
+
     return new RequiredFieldResponse(true);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java b/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
index 4076134..7f07be7 100644
--- a/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
+++ b/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
@@ -69,18 +69,18 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
    */
   static RequiredFieldList getRequiredFields(Configuration configuration) {
     String requiredFieldString = configuration.get(PARQUET_PIG_REQUIRED_FIELDS);
-    
+
     if(requiredFieldString == null) {
       return null;
     }
-    
+
     try {
       return (RequiredFieldList) ObjectSerializer.deserialize(requiredFieldString);
     } catch (IOException iOException) {
       throw new RuntimeException("Failed to deserialize pushProjection");
     }
   }
-  
+
   /**
    * @param fileSchema the parquet schema from the file
    * @param keyValueMetaData the extra meta data from the files
@@ -156,13 +156,13 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
     Schema pigSchema = getPigSchema(initContext.getConfiguration());
     RequiredFieldList requiredFields = getRequiredFields(initContext.getConfiguration());
     boolean columnIndexAccess = initContext.getConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
-    
+
     if (pigSchema == null) {
       return new ReadContext(initContext.getFileSchema());
     } else {
-      
+
       // project the file schema according to the requested Pig schema
-      MessageType parquetRequestedSchema = new PigSchemaConverter(columnIndexAccess).filter(initContext.getFileSchema(), pigSchema, requiredFields);;
+      MessageType parquetRequestedSchema = new PigSchemaConverter(columnIndexAccess).filter(initContext.getFileSchema(), pigSchema, requiredFields);
       return new ReadContext(parquetRequestedSchema);
     }
   }
@@ -175,7 +175,7 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
       ReadContext readContext) {
     MessageType requestedSchema = readContext.getRequestedSchema();
     Schema requestedPigSchema = getPigSchema(configuration);
-    
+
     if (requestedPigSchema == null) {
       throw new ParquetDecodingException("Missing Pig schema: ParquetLoader sets the schema in the job conf");
     }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7b2e59f..65969e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,7 +85,7 @@
     <shade.prefix>parquet</shade.prefix>
     <hadoop.version>1.1.0</hadoop.version>
     <cascading.version>2.5.3</cascading.version>
-    <parquet.format.version>2.1.0</parquet.format.version>
+    <parquet.format.version>2.2.0-rc1</parquet.format.version>
     <log4j.version>1.2.17</log4j.version>
     <previous.version>1.6.0rc1</previous.version>
     <thrift.executable>thrift</thrift.executable>
@@ -209,6 +209,7 @@
                    <previousVersion>${previous.version}</previousVersion>
                    <excludes>
                      <exclude>parquet/org/**</exclude>
+                     <exclude>parquet/hadoop/ParquetInputSplit</exclude>
                    </excludes>
                  </requireBackwardCompatibility>
                </rules>


[2/2] git commit: PARQUET-84: Avoid reading rowgroup metadata in memory on the client side.

Posted by ti...@apache.org.
PARQUET-84: Avoid reading rowgroup metadata in memory on the client side.

This will improve reading big datasets with a large schema (thousands of columns)
Instead rowgroup metadata can be read in the tasks where each tasks reads only the metadata of the file it's reading

Author: julien <ju...@twitter.com>

Closes #45 from julienledem/skip_reading_row_groups and squashes the following commits:

ccdd08c [julien] fix parquet-hive
24a2050 [julien] Merge branch 'master' into skip_reading_row_groups
3d7e35a [julien] adress review feedback
5b6bd1b [julien] more tests
323d254 [julien] sdd unit tests
f599259 [julien] review feedback
fb11f02 [julien] fix backward compatibility check
2c20b46 [julien] cleanup readFooters methods
3da37d8 [julien] fix read summary
ab95a45 [julien] cleanup
4d16df3 [julien] implement task side metadata
9bb8059 [julien] first stab at integrating skipping row groups


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/5dafd127
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/5dafd127
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/5dafd127

Branch: refs/heads/master
Commit: 5dafd127f3de7c516ce9c1b7329087a01ab2fc57
Parents: 647b8a7
Author: julien <ju...@twitter.com>
Authored: Fri Sep 5 11:32:46 2014 -0700
Committer: Tianshuo Deng <td...@twitter.com>
Committed: Fri Sep 5 11:32:46 2014 -0700

----------------------------------------------------------------------
 .../converter/ParquetMetadataConverter.java     | 181 +++++-
 .../java/parquet/hadoop/ParquetFileReader.java  | 213 +++++--
 .../java/parquet/hadoop/ParquetFileWriter.java  |  31 +-
 .../java/parquet/hadoop/ParquetInputFormat.java | 638 ++++++++++++-------
 .../java/parquet/hadoop/ParquetInputSplit.java  | 340 ++++------
 .../parquet/hadoop/ParquetOutputCommitter.java  |   1 -
 .../parquet/hadoop/ParquetOutputFormat.java     |   1 -
 .../main/java/parquet/hadoop/ParquetReader.java |   6 +-
 .../parquet/hadoop/ParquetRecordReader.java     |  70 +-
 .../main/java/parquet/hadoop/PrintFooter.java   |   3 +-
 .../mapred/DeprecatedParquetInputFormat.java    |  30 +-
 .../mapred/DeprecatedParquetOutputFormat.java   |  17 +-
 .../converter/TestParquetMetadataConverter.java | 128 +++-
 .../hadoop/DeprecatedInputFormatTest.java       |   2 +-
 .../java/parquet/hadoop/TestInputFormat.java    | 235 ++++++-
 .../parquet/hadoop/TestParquetFileWriter.java   |  14 +-
 .../parquet/hadoop/TestParquetInputSplit.java   |  45 --
 .../hadoop/example/TestInputOutputFormat.java   |  24 +-
 .../read/ParquetRecordReaderWrapper.java        |  51 +-
 .../main/java/parquet/pig/ParquetLoader.java    |  38 +-
 .../main/java/parquet/pig/TupleReadSupport.java |  14 +-
 pom.xml                                         |   3 +-
 22 files changed, 1361 insertions(+), 724 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
index 5bd6869..76834d5 100644
--- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import parquet.Log;
 import parquet.common.schema.ColumnPath;
 import parquet.format.ColumnChunk;
+import parquet.format.ColumnMetaData;
 import parquet.format.ConvertedType;
 import parquet.format.DataPageHeader;
 import parquet.format.DictionaryPageHeader;
@@ -58,7 +59,7 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName;
 import parquet.schema.Type.Repetition;
 import parquet.schema.TypeVisitor;
 import parquet.schema.Types;
-
+import static java.lang.Math.min;
 import static parquet.format.Util.readFileMetaData;
 import static parquet.format.Util.writePageHeader;
 
@@ -340,8 +341,124 @@ public class ParquetMetadataConverter {
     fileMetaData.addToKey_value_metadata(keyValue);
   }
 
+  private static interface MetadataFilterVisitor<T, E extends Throwable> {
+    T visit(NoFilter filter) throws E;
+    T visit(SkipMetadataFilter filter) throws E;
+    T visit(RangeMetadataFilter filter) throws E;
+  }
+
+  public abstract static class MetadataFilter {
+    private MetadataFilter() {}
+    abstract <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E;
+  }
+  public static final MetadataFilter NO_FILTER = new NoFilter();
+  public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
+  /**
+   * [ startOffset, endOffset )
+   * @param startOffset
+   * @param endOffset
+   * @return the filter
+   */
+  public static final MetadataFilter range(long startOffset, long endOffset) {
+    return new RangeMetadataFilter(startOffset, endOffset);
+  }
+  private static final class NoFilter extends MetadataFilter {
+    private NoFilter() {}
+    @Override
+    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+      return visitor.visit(this);
+    }
+    @Override
+    public String toString() {
+      return "NO_FILTER";
+    }
+  }
+  private static final class SkipMetadataFilter extends MetadataFilter {
+    private SkipMetadataFilter() {}
+    @Override
+    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+      return visitor.visit(this);
+    }
+    @Override
+    public String toString() {
+      return "SKIP_ROW_GROUPS";
+    }
+  }
+  /**
+   * [ startOffset, endOffset )
+   * @author Julien Le Dem
+   */
+  static final class RangeMetadataFilter extends MetadataFilter {
+    final long startOffset;
+    final long endOffset;
+    RangeMetadataFilter(long startOffset, long endOffset) {
+      super();
+      this.startOffset = startOffset;
+      this.endOffset = endOffset;
+    }
+    @Override
+    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+      return visitor.visit(this);
+    }
+    boolean contains(long offset) {
+      return offset >= this.startOffset && offset < this.endOffset;
+    }
+    @Override
+    public String toString() {
+      return "range(s:" + startOffset + ", e:" + endOffset + ")";
+    }
+  }
+
+  @Deprecated
   public ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
-    FileMetaData fileMetaData = readFileMetaData(from);
+    return readParquetMetadata(from, NO_FILTER);
+  }
+
+  static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
+    List<RowGroup> rowGroups = metaData.getRow_groups();
+    List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
+    for (RowGroup rowGroup : rowGroups) {
+      long totalSize = 0;
+      long startIndex = getOffset(rowGroup.getColumns().get(0));
+      for (ColumnChunk col : rowGroup.getColumns()) {
+        totalSize += col.getMeta_data().getTotal_compressed_size();
+      }
+      long midPoint = startIndex + totalSize / 2;
+      if (filter.contains(midPoint)) {
+        newRowGroups.add(rowGroup);
+      }
+    }
+    metaData.setRow_groups(newRowGroups);
+    return metaData;
+  }
+
+  static long getOffset(RowGroup rowGroup) {
+    return getOffset(rowGroup.getColumns().get(0));
+  }
+  static long getOffset(ColumnChunk columnChunk) {
+    ColumnMetaData md = columnChunk.getMeta_data();
+    long offset = md.getData_page_offset();
+    if (md.isSetDictionary_page_offset() && offset > md.getDictionary_page_offset()) {
+      offset = md.getDictionary_page_offset();
+    }
+    return offset;
+  }
+
+  public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
+    FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
+      @Override
+      public FileMetaData visit(NoFilter filter) throws IOException {
+        return readFileMetaData(from);
+      }
+      @Override
+      public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
+        return readFileMetaData(from, true);
+      }
+      @Override
+      public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
+        return filterFileMetaData(readFileMetaData(from), filter);
+      }
+    });
     if (Log.DEBUG) LOG.debug(fileMetaData);
     ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
     if (Log.DEBUG) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
@@ -352,37 +469,39 @@ public class ParquetMetadataConverter {
     MessageType messageType = fromParquetSchema(parquetMetadata.getSchema());
     List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
     List<RowGroup> row_groups = parquetMetadata.getRow_groups();
-    for (RowGroup rowGroup : row_groups) {
-      BlockMetaData blockMetaData = new BlockMetaData();
-      blockMetaData.setRowCount(rowGroup.getNum_rows());
-      blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
-      List<ColumnChunk> columns = rowGroup.getColumns();
-      String filePath = columns.get(0).getFile_path();
-      for (ColumnChunk columnChunk : columns) {
-        if ((filePath == null && columnChunk.getFile_path() != null)
-            || (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
-          throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
+    if (row_groups != null) {
+      for (RowGroup rowGroup : row_groups) {
+        BlockMetaData blockMetaData = new BlockMetaData();
+        blockMetaData.setRowCount(rowGroup.getNum_rows());
+        blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
+        List<ColumnChunk> columns = rowGroup.getColumns();
+        String filePath = columns.get(0).getFile_path();
+        for (ColumnChunk columnChunk : columns) {
+          if ((filePath == null && columnChunk.getFile_path() != null)
+              || (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
+            throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
+          }
+          parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
+          ColumnPath path = getPath(metaData);
+          ColumnChunkMetaData column = ColumnChunkMetaData.get(
+              path,
+              messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
+              CompressionCodecName.fromParquet(metaData.codec),
+              fromFormatEncodings(metaData.encodings),
+              fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
+              metaData.data_page_offset,
+              metaData.dictionary_page_offset,
+              metaData.num_values,
+              metaData.total_compressed_size,
+              metaData.total_uncompressed_size);
+          // TODO
+          // index_page_offset
+          // key_value_metadata
+          blockMetaData.addColumn(column);
         }
-        parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
-        ColumnPath path = getPath(metaData);
-        ColumnChunkMetaData column = ColumnChunkMetaData.get(
-            path,
-            messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
-            CompressionCodecName.fromParquet(metaData.codec),
-            fromFormatEncodings(metaData.encodings),
-            fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
-            metaData.data_page_offset,
-            metaData.dictionary_page_offset,
-            metaData.num_values,
-            metaData.total_compressed_size,
-            metaData.total_uncompressed_size);
-        // TODO
-        // index_page_offset
-        // key_value_metadata
-        blockMetaData.addColumn(column);
+        blockMetaData.setPath(filePath);
+        blocks.add(blockMetaData);
       }
-      blockMetaData.setPath(filePath);
-      blocks.add(blockMetaData);
     }
     Map<String, String> keyValueMetaData = new HashMap<String, String>();
     List<KeyValue> key_value_metadata = parquetMetadata.getKey_value_metadata();

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
index e660c9f..49f1fab 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
@@ -15,13 +15,21 @@
  */
 package parquet.hadoop;
 
+import static parquet.Log.DEBUG;
+import static parquet.bytes.BytesUtils.readIntLittleEndian;
+import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
+import static parquet.hadoop.ParquetFileWriter.*;
+
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.SequenceInputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -39,6 +47,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapred.Utils;
 
 import parquet.Log;
@@ -51,6 +60,7 @@ import parquet.common.schema.ColumnPath;
 import parquet.format.PageHeader;
 import parquet.format.Util;
 import parquet.format.converter.ParquetMetadataConverter;
+import parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
 import parquet.hadoop.CodecFactory.BytesDecompressor;
 import parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
 import parquet.hadoop.metadata.BlockMetaData;
@@ -59,11 +69,6 @@ import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.hadoop.util.counters.BenchmarkCounter;
 import parquet.io.ParquetDecodingException;
 
-import static parquet.Log.DEBUG;
-import static parquet.bytes.BytesUtils.readIntLittleEndian;
-import static parquet.hadoop.ParquetFileWriter.MAGIC;
-import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
-
 /**
  * Internal implementation of the Parquet file reader as a block container
  *
@@ -84,7 +89,28 @@ public class ParquetFileReader implements Closeable {
    * @return the footers for those files using the summary file if possible.
    * @throws IOException
    */
-  public static List<Footer> readAllFootersInParallelUsingSummaryFiles(final Configuration configuration, List<FileStatus> partFiles) throws IOException {
+  @Deprecated
+  public static List<Footer> readAllFootersInParallelUsingSummaryFiles(Configuration configuration, List<FileStatus> partFiles) throws IOException {
+    return readAllFootersInParallelUsingSummaryFiles(configuration, partFiles, false);
+  }
+
+  private static MetadataFilter filter(boolean skipRowGroups) {
+    return skipRowGroups ? SKIP_ROW_GROUPS : NO_FILTER;
+  }
+
+  /**
+   * for files provided, check if there's a summary file.
+   * If a summary file is found it is used otherwise the file footer is used.
+   * @param configuration the hadoop conf to connect to the file system;
+   * @param partFiles the part files to read
+   * @param skipRowGroups to skipRowGroups in the footers
+   * @return the footers for those files using the summary file if possible.
+   * @throws IOException
+   */
+  public static List<Footer> readAllFootersInParallelUsingSummaryFiles(
+      final Configuration configuration,
+      final Collection<FileStatus> partFiles,
+      final boolean skipRowGroups) throws IOException {
 
     // figure out list of all parents to part files
     Set<Path> parents = new HashSet<Path>();
@@ -98,12 +124,17 @@ public class ParquetFileReader implements Closeable {
       summaries.add(new Callable<Map<Path, Footer>>() {
         @Override
         public Map<Path, Footer> call() throws Exception {
-          // fileSystem is thread-safe
-          FileSystem fileSystem = path.getFileSystem(configuration);
-          Path summaryFile = new Path(path, PARQUET_METADATA_FILE);
-          if (fileSystem.exists(summaryFile)) {
-            if (Log.INFO) LOG.info("reading summary file: " + summaryFile);
-            final List<Footer> footers = readSummaryFile(configuration, fileSystem.getFileStatus(summaryFile));
+          ParquetMetadata mergedMetadata = readSummaryMetadata(configuration, path, skipRowGroups);
+          if (mergedMetadata != null) {
+            final List<Footer> footers;
+            if (skipRowGroups) {
+              footers = new ArrayList<Footer>();
+              for (FileStatus f : partFiles) {
+                footers.add(new Footer(f.getPath(), mergedMetadata));
+              }
+            } else {
+              footers = footersFromSummaryFile(path, mergedMetadata);
+            }
             Map<Path, Footer> map = new HashMap<Path, Footer>();
             for (Footer footer : footers) {
               // the folder may have been moved
@@ -143,7 +174,7 @@ public class ParquetFileReader implements Closeable {
     if (toRead.size() > 0) {
       // read the footers of the files that did not have a summary file
       if (Log.INFO) LOG.info("reading another " + toRead.size() + " footers");
-      result.addAll(readAllFootersInParallel(configuration, toRead));
+      result.addAll(readAllFootersInParallel(configuration, toRead, skipRowGroups));
     }
 
     return result;
@@ -170,14 +201,28 @@ public class ParquetFileReader implements Closeable {
     }
   }
 
+  @Deprecated
   public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles) throws IOException {
+    return readAllFootersInParallel(configuration, partFiles, false);
+  }
+
+  /**
+   * read all the footers of the files provided
+   * (not using summary files)
+   * @param configuration the conf to access the File System
+   * @param partFiles the files to read
+   * @param skipRowGroups to skip the rowGroup info
+   * @return the footers
+   * @throws IOException
+   */
+  public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles, final boolean skipRowGroups) throws IOException {
     List<Callable<Footer>> footers = new ArrayList<Callable<Footer>>();
     for (final FileStatus currentFile : partFiles) {
       footers.add(new Callable<Footer>() {
         @Override
         public Footer call() throws Exception {
           try {
-            return new Footer(currentFile.getPath(), ParquetFileReader.readFooter(configuration, currentFile));
+            return new Footer(currentFile.getPath(), readFooter(configuration, currentFile, filter(skipRowGroups)));
           } catch (IOException e) {
             throw new IOException("Could not read footer for file " + currentFile, e);
           }
@@ -191,38 +236,103 @@ public class ParquetFileReader implements Closeable {
     }
   }
 
+  /**
+   * Read the footers of all the files under that path (recursively)
+   * not using summary files.
+   * rowGroups are not skipped
+   * @param configuration the configuration to access the FS
+   * @param fileStatus the root dir
+   * @return all the footers
+   * @throws IOException
+   */
   public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus) throws IOException {
-    final FileSystem fs = fileStatus.getPath().getFileSystem(configuration);
-    List<FileStatus> statuses;
-    if (fileStatus.isDir()) {
-      statuses = Arrays.asList(fs.listStatus(fileStatus.getPath(), new Utils.OutputFileUtils.OutputFilesFilter()));
-    } else {
-      statuses = new ArrayList<FileStatus>();
-      statuses.add(fileStatus);
-    }
-    return readAllFootersInParallel(configuration, statuses);
+    List<FileStatus> statuses = listFiles(configuration, fileStatus);
+    return readAllFootersInParallel(configuration, statuses, false);
   }
 
+  @Deprecated
+  public static List<Footer> readFooters(Configuration configuration, Path path) throws IOException {
+    return readFooters(configuration, status(configuration, path));
+  }
+
+  private static FileStatus status(Configuration configuration, Path path) throws IOException {
+    return path.getFileSystem(configuration).getFileStatus(path);
+  }
+
+  /**
+   * this always returns the row groups
+   * @param configuration
+   * @param pathStatus
+   * @return
+   * @throws IOException
+   */
+  @Deprecated
   public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus) throws IOException {
-    try {
-      if (pathStatus.isDir()) {
-        Path summaryPath = new Path(pathStatus.getPath(), PARQUET_METADATA_FILE);
-        FileSystem fs = summaryPath.getFileSystem(configuration);
-        if (fs.exists(summaryPath)) {
-          FileStatus summaryStatus = fs.getFileStatus(summaryPath);
-          return readSummaryFile(configuration, summaryStatus);
+    return readFooters(configuration, pathStatus, false);
+  }
+
+  /**
+   * Read the footers of all the files under that path (recursively)
+   * using summary files if possible
+   * @param configuration the configuration to access the FS
+   * @param fileStatus the root dir
+   * @return all the footers
+   * @throws IOException
+   */
+  public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus, boolean skipRowGroups) throws IOException {
+    List<FileStatus> files = listFiles(configuration, pathStatus);
+    return readAllFootersInParallelUsingSummaryFiles(configuration, files, skipRowGroups);
+  }
+
+  private static List<FileStatus> listFiles(Configuration conf, FileStatus fileStatus) throws IOException {
+    if (fileStatus.isDir()) {
+      FileSystem fs = fileStatus.getPath().getFileSystem(conf);
+      FileStatus[] list = fs.listStatus(fileStatus.getPath(), new PathFilter() {
+        @Override
+        public boolean accept(Path p) {
+          return !p.getName().startsWith("_") && !p.getName().startsWith(".");
         }
+      });
+      List<FileStatus> result = new ArrayList<FileStatus>();
+      for (FileStatus sub : list) {
+        result.addAll(listFiles(conf, sub));
       }
-    } catch (IOException e) {
-      LOG.warn("can not read summary file for " + pathStatus.getPath(), e);
+      return result;
+    } else {
+      return Arrays.asList(fileStatus);
     }
-    return readAllFootersInParallel(configuration, pathStatus);
-
   }
 
+  /**
+   * Specifically reads a given summary file
+   * @param configuration
+   * @param summaryStatus
+   * @return the metadata translated for each file
+   * @throws IOException
+   */
   public static List<Footer> readSummaryFile(Configuration configuration, FileStatus summaryStatus) throws IOException {
     final Path parent = summaryStatus.getPath().getParent();
-    ParquetMetadata mergedFooters = readFooter(configuration, summaryStatus);
+    ParquetMetadata mergedFooters = readFooter(configuration, summaryStatus, filter(false));
+    return footersFromSummaryFile(parent, mergedFooters);
+  }
+
+  static ParquetMetadata readSummaryMetadata(Configuration configuration, Path basePath, boolean skipRowGroups) throws IOException {
+    Path metadataFile = new Path(basePath, PARQUET_METADATA_FILE);
+    Path commonMetaDataFile = new Path(basePath, PARQUET_COMMON_METADATA_FILE);
+    FileSystem fileSystem = basePath.getFileSystem(configuration);
+    if (skipRowGroups && fileSystem.exists(commonMetaDataFile)) {
+      // reading the summary file that does not contain the row groups
+      if (Log.INFO) LOG.info("reading summary file: " + commonMetaDataFile);
+      return readFooter(configuration, commonMetaDataFile, filter(skipRowGroups));
+    } else if (fileSystem.exists(metadataFile)) {
+      if (Log.INFO) LOG.info("reading summary file: " + metadataFile);
+      return readFooter(configuration, metadataFile, filter(skipRowGroups));
+    } else {
+      return null;
+    }
+  }
+
+  static List<Footer> footersFromSummaryFile(final Path parent, ParquetMetadata mergedFooters) {
     Map<Path, ParquetMetadata> footers = new HashMap<Path, ParquetMetadata>();
     List<BlockMetaData> blocks = mergedFooters.getBlocks();
     for (BlockMetaData block : blocks) {
@@ -249,25 +359,42 @@ public class ParquetFileReader implements Closeable {
    * @return the metadata blocks in the footer
    * @throws IOException if an error occurs while reading the file
    */
+  @Deprecated
   public static final ParquetMetadata readFooter(Configuration configuration, Path file) throws IOException {
-    FileSystem fileSystem = file.getFileSystem(configuration);
-    return readFooter(configuration, fileSystem.getFileStatus(file));
+    return readFooter(configuration, file, NO_FILTER);
   }
 
-
-  public static final List<Footer> readFooters(Configuration configuration, Path file) throws IOException {
+  /**
+   * Reads the meta data in the footer of the file.
+   * Skipping row groups (or not) based on the provided filter
+   * @param configuration
+   * @param file the Parquet File
+   * @param filter the filter to apply to row groups
+   * @return the metadata with row groups filtered.
+   * @throws IOException  if an error occurs while reading the file
+   */
+  public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
     FileSystem fileSystem = file.getFileSystem(configuration);
-    return readFooters(configuration, fileSystem.getFileStatus(file));
+    return readFooter(configuration, fileSystem.getFileStatus(file), filter);
+  }
+
+  /**
+   * @deprecated use {@link ParquetFileReader#readFooter(Configuration, FileStatus, MetadataFilter)}
+   */
+  @Deprecated
+  public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file) throws IOException {
+    return readFooter(configuration, file, NO_FILTER);
   }
 
   /**
    * Reads the meta data block in the footer of the file
    * @param configuration
    * @param file the parquet File
+   * @param filter the filter to apply to row groups
    * @return the metadata blocks in the footer
    * @throws IOException if an error occurs while reading the file
    */
-  public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file) throws IOException {
+  public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
     FileSystem fileSystem = file.getPath().getFileSystem(configuration);
     FSDataInputStream f = fileSystem.open(file.getPath());
     try {
@@ -293,7 +420,7 @@ public class ParquetFileReader implements Closeable {
         throw new RuntimeException("corrupted file: the footer index is not within the file");
       }
       f.seek(footerIndex);
-      return parquetMetadataConverter.readParquetMetadata(f);
+      return parquetMetadataConverter.readParquetMetadata(f, filter);
     } finally {
       f.close();
     }
@@ -430,7 +557,7 @@ public class ParquetFileReader implements Closeable {
                     this.readAsBytesInput(pageHeader.compressed_page_size),
                     pageHeader.data_page_header.num_values,
                     pageHeader.uncompressed_page_size,
-                    parquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, descriptor.col.getType()),
+                    ParquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, descriptor.col.getType()),
                     parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
                     parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
                     parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
index f3ef61b..42d91a4 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
@@ -15,6 +15,9 @@
  */
 package parquet.hadoop;
 
+import static parquet.Log.DEBUG;
+import static parquet.format.Util.writeFileMetaData;
+
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -49,9 +52,6 @@ import parquet.io.ParquetEncodingException;
 import parquet.schema.MessageType;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
-import static parquet.Log.DEBUG;
-import static parquet.format.Util.writeFileMetaData;
-
 /**
  * Internal implementation of the Parquet file writer as a block container
  *
@@ -62,6 +62,7 @@ public class ParquetFileWriter {
   private static final Log LOG = Log.getLog(ParquetFileWriter.class);
 
   public static final String PARQUET_METADATA_FILE = "_metadata";
+  public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
   public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
   public static final int CURRENT_VERSION = 1;
 
@@ -83,7 +84,7 @@ public class ParquetFileWriter {
   private long currentChunkFirstDataPage;
   private long currentChunkDictionaryPageOffset;
   private long currentChunkValueCount;
-  
+
   private Statistics currentStatistics;
 
   /**
@@ -387,19 +388,26 @@ public class ParquetFileWriter {
   }
 
   /**
-   * writes a _metadata file
+   * writes a _metadata and _common_metadata file
    * @param configuration the configuration to use to get the FileSystem
    * @param outputPath the directory to write the _metadata file to
    * @param footers the list of footers to merge
    * @throws IOException
    */
   public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
-    Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE);
+    ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
     FileSystem fs = outputPath.getFileSystem(configuration);
     outputPath = outputPath.makeQualified(fs);
+    writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);
+    metadataFooter.getBlocks().clear();
+    writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE);
+  }
+
+  private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile)
+      throws IOException {
+    Path metaDataPath = new Path(outputPath, parquetMetadataFile);
     FSDataOutputStream metadata = fs.create(metaDataPath);
     metadata.write(MAGIC);
-    ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
     serializeFooter(metadataFooter, metadata);
     metadata.close();
   }
@@ -439,11 +447,10 @@ public class ParquetFileWriter {
    * @param footers the list files footers to merge
    * @return the global meta data for all the footers
    */
-  
   static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
     return getGlobalMetaData(footers, true);
   }
-  
+
   static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
     GlobalMetaData fileMetaData = null;
     for (Footer footer : footers) {
@@ -464,7 +471,7 @@ public class ParquetFileWriter {
       GlobalMetaData mergedMetadata) {
     return mergeInto(toMerge, mergedMetadata, true);
   }
-  
+
   static GlobalMetaData mergeInto(
       FileMetaData toMerge,
       GlobalMetaData mergedMetadata,
@@ -505,7 +512,7 @@ public class ParquetFileWriter {
   static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
     return mergeInto(toMerge, mergedSchema, true);
   }
-  
+
   /**
    * will return the result of merging toMerge into mergedSchema
    * @param toMerge the schema to merge into mergedSchema
@@ -517,7 +524,7 @@ public class ParquetFileWriter {
     if (mergedSchema == null) {
       return toMerge;
     }
-    
+
     return mergedSchema.union(toMerge, strict);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
index 0231ccd..d79ca51 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
@@ -15,9 +15,12 @@
  */
 package parquet.hadoop;
 
+import static parquet.Preconditions.checkArgument;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -62,8 +65,6 @@ import parquet.io.ParquetDecodingException;
 import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 
-import static parquet.Preconditions.checkArgument;
-
 /**
  * The input format to read a Parquet file.
  *
@@ -89,7 +90,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
    * key to configure the filter
    */
   public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter";
-  
+
   /**
    * key to configure type checking for conflicting schemas (default: true)
    */
@@ -100,11 +101,17 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
    */
   public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";
 
+  public static final String TASK_SIDE_METADATA = "parquet.task.side.metadata";
+
   private static final int MIN_FOOTER_CACHE_SIZE = 100;
 
-  private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;
+  public static void setTaskSideMetaData(Job job,  boolean taskSideMetadata) {
+    ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata);
+  }
 
-  private Class<?> readSupportClass;
+  public static boolean isTaskSideMetaData(Configuration configuration) {
+    return configuration.getBoolean(TASK_SIDE_METADATA, Boolean.FALSE);
+  }
 
   public static void setReadSupportClass(Job job,  Class<?> readSupportClass) {
     ContextUtil.getConfiguration(job).set(READ_SUPPORT_CLASS, readSupportClass.getName());
@@ -181,6 +188,10 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
     return FilterCompat.get(getFilterPredicate(conf), getUnboundRecordFilterInstance(conf));
   }
 
+  private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;
+
+  private Class<?> readSupportClass;
+
   /**
    * Hadoop will instantiate using this constructor
    */
@@ -202,11 +213,8 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   public RecordReader<Void, T> createRecordReader(
       InputSplit inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-
-    ReadSupport<T> readSupport = getReadSupport(ContextUtil.getConfiguration(taskAttemptContext));
-
     Configuration conf = ContextUtil.getConfiguration(taskAttemptContext);
-
+    ReadSupport<T> readSupport = getReadSupport(conf);
     return new ParquetRecordReader<T>(readSupport, getFilter(conf));
   }
 
@@ -217,6 +225,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   public ReadSupport<T> getReadSupport(Configuration configuration){
     try {
       if (readSupportClass == null) {
+        // TODO: fix this weird caching independent of the conf parameter
         readSupportClass = getReadSupportClass(configuration);
       }
       return (ReadSupport<T>)readSupportClass.newInstance();
@@ -227,195 +236,13 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
     }
   }
 
-  //Wrapper of hdfs blocks, keep track of which HDFS block is being used
-  private static class HDFSBlocks {
-    BlockLocation[] hdfsBlocks;
-    int currentStartHdfsBlockIndex = 0;//the hdfs block index corresponding to the start of a row group
-    int currentMidPointHDFSBlockIndex = 0;// the hdfs block index corresponding to the mid-point of a row group, a split might be created only when the midpoint of the rowgroup enters a new hdfs block
-
-    private HDFSBlocks(BlockLocation[] hdfsBlocks) {
-      this.hdfsBlocks = hdfsBlocks;
-      Comparator<BlockLocation> comparator = new Comparator<BlockLocation>() {
-        @Override
-        public int compare(BlockLocation b1, BlockLocation b2) {
-          return Long.signum(b1.getOffset() - b2.getOffset());
-        }
-      };
-      Arrays.sort(hdfsBlocks, comparator);
-    }
-
-    private long getHDFSBlockEndingPosition(int hdfsBlockIndex) {
-      BlockLocation hdfsBlock = hdfsBlocks[hdfsBlockIndex];
-      return hdfsBlock.getOffset() + hdfsBlock.getLength() - 1;
-    }
-
-    /**
-     * @param rowGroupMetadata
-     * @return true if the mid point of row group is in a new hdfs block, and also move the currentHDFSBlock pointer to the correct index that contains the row group;
-     * return false if the mid point of row group is in the same hdfs block
-     */
-    private boolean checkBelongingToANewHDFSBlock(BlockMetaData rowGroupMetadata) {
-      boolean isNewHdfsBlock = false;
-      long rowGroupMidPoint = rowGroupMetadata.getStartingPos() + (rowGroupMetadata.getCompressedSize() / 2);
-
-      //if mid point is not in the current HDFS block any more, return true
-      while (rowGroupMidPoint > getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex)) {
-        isNewHdfsBlock = true;
-        currentMidPointHDFSBlockIndex++;
-        if (currentMidPointHDFSBlockIndex >= hdfsBlocks.length)
-          throw new ParquetDecodingException("the row group is not in hdfs blocks in the file: midpoint of row groups is "
-                  + rowGroupMidPoint
-                  + ", the end of the hdfs block is "
-                  + getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex - 1));
-      }
-
-      while (rowGroupMetadata.getStartingPos() > getHDFSBlockEndingPosition(currentStartHdfsBlockIndex)) {
-        currentStartHdfsBlockIndex++;
-        if (currentStartHdfsBlockIndex >= hdfsBlocks.length)
-          throw new ParquetDecodingException("The row group does not start in this file: row group offset is "
-                  + rowGroupMetadata.getStartingPos()
-                  + " but the end of hdfs blocks of file is "
-                  + getHDFSBlockEndingPosition(currentStartHdfsBlockIndex));
-      }
-      return isNewHdfsBlock;
-    }
-
-    public BlockLocation get(int hdfsBlockIndex) {
-      return hdfsBlocks[hdfsBlockIndex];
-    }
-
-    public BlockLocation getCurrentBlock() {
-      return hdfsBlocks[currentStartHdfsBlockIndex];
-    }
-  }
-
-  private static class SplitInfo {
-    List<BlockMetaData> rowGroups = new ArrayList<BlockMetaData>();
-    BlockLocation hdfsBlock;
-    long compressedByteSize = 0L;
-
-    public SplitInfo(BlockLocation currentBlock) {
-      this.hdfsBlock = currentBlock;
-    }
-
-    private void addRowGroup(BlockMetaData rowGroup) {
-      this.rowGroups.add(rowGroup);
-      this.compressedByteSize += rowGroup.getCompressedSize();
-    }
-
-    public long getCompressedByteSize() {
-      return compressedByteSize;
-    }
-
-    public List<BlockMetaData> getRowGroups() {
-      return rowGroups;
-    }
-
-    int getRowGroupCount() {
-      return rowGroups.size();
-    }
-
-    public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, FileMetaData fileMetaData, String requestedSchema, Map<String, String> readSupportMetadata, String fileSchema) throws IOException {
-      MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
-      long length = 0;
-
-      for (BlockMetaData block : this.getRowGroups()) {
-        List<ColumnChunkMetaData> columns = block.getColumns();
-        for (ColumnChunkMetaData column : columns) {
-          if (requested.containsPath(column.getPath().toArray())) {
-            length += column.getTotalSize();
-          }
-        }
-      }
-      return new ParquetInputSplit(
-              fileStatus.getPath(),
-              hdfsBlock.getOffset(),
-              length,
-              hdfsBlock.getHosts(),
-              this.getRowGroups(),
-              requestedSchema,
-              fileSchema,
-              fileMetaData.getKeyValueMetaData(),
-              readSupportMetadata
-      );
-    }
-  }
-
-  /**
-   * groups together all the data blocks for the same HDFS block
-   *
-   * @param rowGroupBlocks      data blocks (row groups)
-   * @param hdfsBlocksArray     hdfs blocks
-   * @param fileStatus          the containing file
-   * @param fileMetaData        file level meta data
-   * @param requestedSchema     the schema requested by the user
-   * @param readSupportMetadata the metadata provided by the readSupport implementation in init
-   * @param minSplitSize        the mapred.min.split.size
-   * @param maxSplitSize        the mapred.max.split.size
-   * @return the splits (one per HDFS block)
-   * @throws IOException If hosts can't be retrieved for the HDFS block
-   */
-  static <T> List<ParquetInputSplit> generateSplits(
-          List<BlockMetaData> rowGroupBlocks,
-          BlockLocation[] hdfsBlocksArray,
-          FileStatus fileStatus,
-          FileMetaData fileMetaData,
-          String requestedSchema,
-          Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
-    if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
-      throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
-    }
-    String fileSchema = fileMetaData.getSchema().toString().intern();
-    HDFSBlocks hdfsBlocks = new HDFSBlocks(hdfsBlocksArray);
-    hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupBlocks.get(0));
-    SplitInfo currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
-
-    //assign rowGroups to splits
-    List<SplitInfo> splitRowGroups = new ArrayList<SplitInfo>();
-    checkSorted(rowGroupBlocks);//assert row groups are sorted
-    for (BlockMetaData rowGroupMetadata : rowGroupBlocks) {
-      if ((hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupMetadata)
-             && currentSplit.getCompressedByteSize() >= minSplitSize
-             && currentSplit.getCompressedByteSize() > 0)
-           || currentSplit.getCompressedByteSize() >= maxSplitSize) {
-        //create a new split
-        splitRowGroups.add(currentSplit);//finish previous split
-        currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
-      }
-      currentSplit.addRowGroup(rowGroupMetadata);
-    }
-
-    if (currentSplit.getRowGroupCount() > 0) {
-      splitRowGroups.add(currentSplit);
-    }
-
-    //generate splits from rowGroups of each split
-    List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
-    for (SplitInfo splitInfo : splitRowGroups) {
-      ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, fileMetaData, requestedSchema, readSupportMetadata, fileSchema);
-      resultSplits.add(split);
-    }
-    return resultSplits;
-  }
-
-  private static void checkSorted(List<BlockMetaData> rowGroupBlocks) {
-    long previousOffset = 0L;
-    for(BlockMetaData rowGroup: rowGroupBlocks) {
-      long currentOffset = rowGroup.getStartingPos();
-      if (currentOffset < previousOffset) {
-        throw new ParquetDecodingException("row groups are not sorted: previous row groups starts at " + previousOffset + ", current row group starts at " + currentOffset);
-      }
-    }
-  }
-
   /**
    * {@inheritDoc}
    */
   @Override
   public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-    splits.addAll(getSplits(ContextUtil.getConfiguration(jobContext), getFooters(jobContext)));
-    return splits;
+    Configuration configuration = ContextUtil.getConfiguration(jobContext);
+    return new ArrayList<InputSplit>(getSplits(configuration, getFooters(jobContext)));
   }
 
   /**
@@ -425,63 +252,20 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
    * @throws IOException
    */
   public List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers) throws IOException {
+    boolean taskSideMetaData = isTaskSideMetaData(configuration);
+    boolean strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
     final long maxSplitSize = configuration.getLong("mapred.max.split.size", Long.MAX_VALUE);
     final long minSplitSize = Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L));
     if (maxSplitSize < 0 || minSplitSize < 0) {
-      throw new ParquetDecodingException("maxSplitSize or minSplitSie should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
+      throw new ParquetDecodingException("maxSplitSize or minSplitSize should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
     }
-    List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
-    GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, configuration.getBoolean(STRICT_TYPE_CHECKING, true));
+    GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, strictTypeChecking);
     ReadContext readContext = getReadSupport(configuration).init(new InitContext(
         configuration,
         globalMetaData.getKeyValueMetaData(),
         globalMetaData.getSchema()));
 
-    Filter filter = getFilter(configuration);
-
-    long rowGroupsDropped = 0;
-    long totalRowGroups = 0;
-
-    for (Footer footer : footers) {
-      final Path file = footer.getFile();
-      LOG.debug(file);
-      FileSystem fs = file.getFileSystem(configuration);
-      FileStatus fileStatus = fs.getFileStatus(file);
-      ParquetMetadata parquetMetaData = footer.getParquetMetadata();
-      List<BlockMetaData> blocks = parquetMetaData.getBlocks();
-
-      List<BlockMetaData> filteredBlocks = blocks;
-
-      totalRowGroups += blocks.size();
-      filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
-      rowGroupsDropped += blocks.size() - filteredBlocks.size();
-
-      if (filteredBlocks.isEmpty()) {
-        continue;
-      }
-
-      BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-      splits.addAll(
-          generateSplits(
-              filteredBlocks,
-              fileBlockLocations,
-              fileStatus,
-              parquetMetaData.getFileMetaData(),
-              readContext.getRequestedSchema().toString(),
-              readContext.getReadSupportMetadata(),
-              minSplitSize,
-              maxSplitSize)
-          );
-    }
-
-    if (rowGroupsDropped > 0 && totalRowGroups > 0) {
-      int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
-      LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
-    } else {
-      LOG.info("There were no row groups that could be dropped due to filter predicates");
-    }
-
-    return splits;
+    return SplitStrategy.getSplitStrategy(taskSideMetaData).getSplits(configuration, footers, maxSplitSize, minSplitSize, readContext);
   }
 
   /*
@@ -539,7 +323,6 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
     if (statuses.isEmpty()) {
       return Collections.emptyList();
     }
-
     Configuration config = ContextUtil.getConfiguration(jobContext);
     List<Footer> footers = new ArrayList<Footer>(statuses.size());
     Set<FileStatus> missingStatuses = new HashSet<FileStatus>();
@@ -575,8 +358,7 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
       return footers;
     }
 
-    List<Footer> newFooters =
-            getFooters(config, new ArrayList<FileStatus>(missingStatuses));
+    List<Footer> newFooters = getFooters(config, missingStatuses);
     for (Footer newFooter : newFooters) {
       // Use the original file status objects to make sure we store a
       // conservative (older) modification time (i.e. in case the files and
@@ -590,6 +372,10 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
     return footers;
   }
 
+  public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
+    return getFooters(configuration, (Collection<FileStatus>)statuses);
+  }
+
   /**
    * the footers for the files
    * @param configuration to connect to the file system
@@ -597,9 +383,10 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
    * @return the footers of the files
    * @throws IOException
    */
-  public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
+  public List<Footer> getFooters(Configuration configuration, Collection<FileStatus> statuses) throws IOException {
     if (Log.DEBUG) LOG.debug("reading " + statuses.size() + " files");
-    return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses);
+    boolean taskSideMetaData = isTaskSideMetaData(configuration);
+    return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses, taskSideMetaData);
   }
 
   /**
@@ -688,3 +475,362 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   }
 
 }
+abstract class SplitStrategy {
+  private static final Log LOG = Log.getLog(SplitStrategy.class);
+
+  static SplitStrategy getSplitStrategy(boolean taskSideMetaData) {
+    if (taskSideMetaData) {
+      LOG.info("Using Task Side Metadata Split Strategy");
+      return new TaskSideMetadataSplitStrategy();
+    } else {
+      LOG.info("Using Client Side Metadata Split Strategy");
+      return new ClientSideMetadataSplitStrategy();
+    }
+  }
+
+  abstract List<ParquetInputSplit> getSplits(
+      Configuration configuration,
+      List<Footer> footers,
+      final long maxSplitSize, final long minSplitSize,
+      ReadContext readContext) throws IOException;
+}
+class TaskSideMetadataSplitStrategy extends SplitStrategy {
+
+  @Override
+  List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
+      long maxSplitSize, long minSplitSize, ReadContext readContext) throws IOException {
+    List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
+    for (Footer footer : footers) {
+      // TODO: keep status in Footer
+      final Path file = footer.getFile();
+      FileSystem fs = file.getFileSystem(configuration);
+      FileStatus fileStatus = fs.getFileStatus(file);
+      BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+      splits.addAll(generateTaskSideMDSplits(
+          fileBlockLocations,
+          fileStatus,
+          readContext.getRequestedSchema().toString(),
+          readContext.getReadSupportMetadata(),
+          minSplitSize,
+          maxSplitSize));
+
+    }
+    return splits;
+  }
+
+  private static int findBlockIndex(BlockLocation[] hdfsBlocksArray, long endOffset) {
+    for (int i = 0; i < hdfsBlocksArray.length; i++) {
+      BlockLocation block = hdfsBlocksArray[i];
+      // end offset is exclusive. We want the block that contains the point right before.
+      if (endOffset > block.getOffset() && endOffset <= (block.getOffset() + block.getLength())) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  static <T> List<ParquetInputSplit> generateTaskSideMDSplits(
+      BlockLocation[] hdfsBlocksArray,
+      FileStatus fileStatus,
+      String requestedSchema,
+      Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
+    if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
+      throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
+    }
+    //generate splits from rowGroups of each split
+    List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
+    // [startOffset, endOffset)
+    long startOffset = 0;
+    long endOffset = 0;
+    // they should already be sorted
+    Arrays.sort(hdfsBlocksArray, new Comparator<BlockLocation>() {
+      @Override
+      public int compare(BlockLocation o1, BlockLocation o2) {
+        return compare(o1.getOffset(), o2.getOffset());
+      }
+      private int compare(long x, long y) {
+        return (x < y) ? -1 : ((x == y) ? 0 : 1);
+      }
+    });
+    final BlockLocation lastBlock =
+        hdfsBlocksArray.length == 0 ? null : hdfsBlocksArray[hdfsBlocksArray.length - 1];
+    while (endOffset < fileStatus.getLen()) {
+      startOffset = endOffset;
+      BlockLocation blockLocation;
+      final int nextBlockMin = findBlockIndex(hdfsBlocksArray, startOffset + minSplitSize);
+      final int nextBlockMax = findBlockIndex(hdfsBlocksArray, startOffset + maxSplitSize);
+      if (nextBlockMax == nextBlockMin && nextBlockMax != -1) {
+        // no block boundary between min and max
+        // => use max for the size of the split
+        endOffset = startOffset + maxSplitSize;
+        blockLocation = hdfsBlocksArray[nextBlockMax];
+      } else if (nextBlockMin > -1) {
+        // block boundary between min and max
+        // we end the split at the first block boundary
+        blockLocation = hdfsBlocksArray[nextBlockMin];
+        endOffset = blockLocation.getOffset() + blockLocation.getLength();
+      } else {
+        // min and max after last block
+        // small last split
+        endOffset = fileStatus.getLen();
+        blockLocation = lastBlock;
+      }
+      resultSplits.add(
+          new ParquetInputSplit(
+              fileStatus.getPath(),
+              startOffset, endOffset, endOffset - startOffset,
+              blockLocation == null ? new String[0] : blockLocation.getHosts(),
+              null,
+              requestedSchema, readSupportMetadata));
+    }
+    return resultSplits;
+  }
+}
+class ClientSideMetadataSplitStrategy extends SplitStrategy {
+  //Wrapper of hdfs blocks, keep track of which HDFS block is being used
+  private static class HDFSBlocks {
+    BlockLocation[] hdfsBlocks;
+    int currentStartHdfsBlockIndex = 0;//the hdfs block index corresponding to the start of a row group
+    int currentMidPointHDFSBlockIndex = 0;// the hdfs block index corresponding to the mid-point of a row group, a split might be created only when the midpoint of the rowgroup enters a new hdfs block
+
+    private HDFSBlocks(BlockLocation[] hdfsBlocks) {
+      this.hdfsBlocks = hdfsBlocks;
+      Comparator<BlockLocation> comparator = new Comparator<BlockLocation>() {
+        @Override
+        public int compare(BlockLocation b1, BlockLocation b2) {
+          return Long.signum(b1.getOffset() - b2.getOffset());
+        }
+      };
+      Arrays.sort(hdfsBlocks, comparator);
+    }
+
+    private long getHDFSBlockEndingPosition(int hdfsBlockIndex) {
+      BlockLocation hdfsBlock = hdfsBlocks[hdfsBlockIndex];
+      return hdfsBlock.getOffset() + hdfsBlock.getLength() - 1;
+    }
+
+    /**
+     * @param rowGroupMetadata
+     * @return true if the mid point of row group is in a new hdfs block, and also move the currentHDFSBlock pointer to the correct index that contains the row group;
+     * return false if the mid point of row group is in the same hdfs block
+     */
+    private boolean checkBelongingToANewHDFSBlock(BlockMetaData rowGroupMetadata) {
+      boolean isNewHdfsBlock = false;
+      long rowGroupMidPoint = rowGroupMetadata.getStartingPos() + (rowGroupMetadata.getCompressedSize() / 2);
+
+      //if mid point is not in the current HDFS block any more, return true
+      while (rowGroupMidPoint > getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex)) {
+        isNewHdfsBlock = true;
+        currentMidPointHDFSBlockIndex++;
+        if (currentMidPointHDFSBlockIndex >= hdfsBlocks.length)
+          throw new ParquetDecodingException("the row group is not in hdfs blocks in the file: midpoint of row groups is "
+                  + rowGroupMidPoint
+                  + ", the end of the hdfs block is "
+                  + getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex - 1));
+      }
+
+      while (rowGroupMetadata.getStartingPos() > getHDFSBlockEndingPosition(currentStartHdfsBlockIndex)) {
+        currentStartHdfsBlockIndex++;
+        if (currentStartHdfsBlockIndex >= hdfsBlocks.length)
+          throw new ParquetDecodingException("The row group does not start in this file: row group offset is "
+                  + rowGroupMetadata.getStartingPos()
+                  + " but the end of hdfs blocks of file is "
+                  + getHDFSBlockEndingPosition(currentStartHdfsBlockIndex));
+      }
+      return isNewHdfsBlock;
+    }
+
+    public BlockLocation getCurrentBlock() {
+      return hdfsBlocks[currentStartHdfsBlockIndex];
+    }
+  }
+
+  static class SplitInfo {
+    List<BlockMetaData> rowGroups = new ArrayList<BlockMetaData>();
+    BlockLocation hdfsBlock;
+    long compressedByteSize = 0L;
+
+    public SplitInfo(BlockLocation currentBlock) {
+      this.hdfsBlock = currentBlock;
+    }
+
+    private void addRowGroup(BlockMetaData rowGroup) {
+      this.rowGroups.add(rowGroup);
+      this.compressedByteSize += rowGroup.getCompressedSize();
+    }
+
+    public long getCompressedByteSize() {
+      return compressedByteSize;
+    }
+
+    public List<BlockMetaData> getRowGroups() {
+      return rowGroups;
+    }
+
+    int getRowGroupCount() {
+      return rowGroups.size();
+    }
+
+    public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, String requestedSchema, Map<String, String> readSupportMetadata) throws IOException {
+      MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
+      long length = 0;
+
+      for (BlockMetaData block : this.getRowGroups()) {
+        List<ColumnChunkMetaData> columns = block.getColumns();
+        for (ColumnChunkMetaData column : columns) {
+          if (requested.containsPath(column.getPath().toArray())) {
+            length += column.getTotalSize();
+          }
+        }
+      }
+
+      BlockMetaData lastRowGroup = this.getRowGroups().get(this.getRowGroupCount() - 1);
+      long end = lastRowGroup.getStartingPos() + lastRowGroup.getTotalByteSize();
+
+      long[] rowGroupOffsets = new long[this.getRowGroupCount()];
+      for (int i = 0; i < rowGroupOffsets.length; i++) {
+        rowGroupOffsets[i] = this.getRowGroups().get(i).getStartingPos();
+      }
+
+      return new ParquetInputSplit(
+              fileStatus.getPath(),
+              hdfsBlock.getOffset(),
+              end,
+              length,
+              hdfsBlock.getHosts(),
+              rowGroupOffsets,
+              requestedSchema,
+              readSupportMetadata
+      );
+    }
+  }
+
+  private static final Log LOG = Log.getLog(ClientSideMetadataSplitStrategy.class);
+
+  @Override
+  List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
+      long maxSplitSize, long minSplitSize, ReadContext readContext)
+      throws IOException {
+    List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
+    Filter filter = ParquetInputFormat.getFilter(configuration);
+
+    long rowGroupsDropped = 0;
+    long totalRowGroups = 0;
+
+    for (Footer footer : footers) {
+      final Path file = footer.getFile();
+      LOG.debug(file);
+      FileSystem fs = file.getFileSystem(configuration);
+      FileStatus fileStatus = fs.getFileStatus(file);
+      ParquetMetadata parquetMetaData = footer.getParquetMetadata();
+      List<BlockMetaData> blocks = parquetMetaData.getBlocks();
+
+      List<BlockMetaData> filteredBlocks;
+
+      totalRowGroups += blocks.size();
+      filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
+      rowGroupsDropped += blocks.size() - filteredBlocks.size();
+
+      if (filteredBlocks.isEmpty()) {
+        continue;
+      }
+
+      BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+      splits.addAll(
+          generateSplits(
+              filteredBlocks,
+              fileBlockLocations,
+              fileStatus,
+              readContext.getRequestedSchema().toString(),
+              readContext.getReadSupportMetadata(),
+              minSplitSize,
+              maxSplitSize)
+          );
+    }
+
+    if (rowGroupsDropped > 0 && totalRowGroups > 0) {
+      int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
+      LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
+    } else {
+      LOG.info("There were no row groups that could be dropped due to filter predicates");
+    }
+    return splits;
+  }
+
+  /**
+   * groups together all the data blocks for the same HDFS block
+   *
+   * @param rowGroupBlocks      data blocks (row groups)
+   * @param hdfsBlocksArray     hdfs blocks
+   * @param fileStatus          the containing file
+   * @param requestedSchema     the schema requested by the user
+   * @param readSupportMetadata the metadata provided by the readSupport implementation in init
+   * @param minSplitSize        the mapred.min.split.size
+   * @param maxSplitSize        the mapred.max.split.size
+   * @return the splits (one per HDFS block)
+   * @throws IOException If hosts can't be retrieved for the HDFS block
+   */
+  static <T> List<ParquetInputSplit> generateSplits(
+          List<BlockMetaData> rowGroupBlocks,
+          BlockLocation[] hdfsBlocksArray,
+          FileStatus fileStatus,
+          String requestedSchema,
+          Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
+
+    List<SplitInfo> splitRowGroups =
+        generateSplitInfo(rowGroupBlocks, hdfsBlocksArray, minSplitSize, maxSplitSize);
+
+    //generate splits from rowGroups of each split
+    List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
+    for (SplitInfo splitInfo : splitRowGroups) {
+      ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, requestedSchema, readSupportMetadata);
+      resultSplits.add(split);
+    }
+    return resultSplits;
+  }
+
+  static List<SplitInfo> generateSplitInfo(
+      List<BlockMetaData> rowGroupBlocks,
+      BlockLocation[] hdfsBlocksArray,
+      long minSplitSize, long maxSplitSize) {
+    List<SplitInfo> splitRowGroups;
+
+    if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
+      throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
+    }
+    HDFSBlocks hdfsBlocks = new HDFSBlocks(hdfsBlocksArray);
+    hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupBlocks.get(0));
+    SplitInfo currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
+
+    //assign rowGroups to splits
+    splitRowGroups = new ArrayList<SplitInfo>();
+    checkSorted(rowGroupBlocks);//assert row groups are sorted
+    for (BlockMetaData rowGroupMetadata : rowGroupBlocks) {
+      if ((hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupMetadata)
+             && currentSplit.getCompressedByteSize() >= minSplitSize
+             && currentSplit.getCompressedByteSize() > 0)
+           || currentSplit.getCompressedByteSize() >= maxSplitSize) {
+        //create a new split
+        splitRowGroups.add(currentSplit);//finish previous split
+        currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
+      }
+      currentSplit.addRowGroup(rowGroupMetadata);
+    }
+
+    if (currentSplit.getRowGroupCount() > 0) {
+      splitRowGroups.add(currentSplit);
+    }
+
+    return splitRowGroups;
+  }
+
+  private static void checkSorted(List<BlockMetaData> rowGroupBlocks) {
+    long previousOffset = 0L;
+    for(BlockMetaData rowGroup: rowGroupBlocks) {
+      long currentOffset = rowGroup.getStartingPos();
+      if (currentOffset < previousOffset) {
+        throw new ParquetDecodingException("row groups are not sorted: previous row groups starts at " + previousOffset + ", current row group starts at " + currentOffset);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
index da0c2ec..399be64 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
@@ -15,54 +15,47 @@
  */
 package parquet.hadoop;
 
-import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
-import parquet.Log;
-import parquet.column.Encoding;
-import parquet.column.statistics.IntStatistics;
-import parquet.common.schema.ColumnPath;
+import parquet.bytes.BytesUtils;
 import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 /**
  * An input split for the Parquet format
  * It contains the information to read one block of the file.
  *
+ * This class is private to the ParquetInputFormat.
+ * Backward compatibility is not maintained.
+ *
  * @author Julien Le Dem
  */
+@Private
 public class ParquetInputSplit extends FileSplit implements Writable {
 
-  private static final Log LOG = Log.getLog(ParquetInputSplit.class);
 
-  private List<BlockMetaData> blocks;
+  private long end;
+  private long[] rowGroupOffsets;
   private String requestedSchema;
-  private String fileSchema;
-  private Map<String, String> extraMetadata;
   private Map<String, String> readSupportMetadata;
 
-
   /**
    * Writables must have a parameterless constructor
    */
@@ -71,19 +64,19 @@ public class ParquetInputSplit extends FileSplit implements Writable {
   }
 
   /**
-   * Used by {@link ParquetInputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)}
-   * @param path the path to the file
-   * @param start the offset of the block in the file
-   * @param length the size of the block in the file
-   * @param hosts the hosts where this block can be found
-   * @param blocks the block meta data (Columns locations)
-   * @param schema the file schema
-   * @param readSupportClass the class used to materialize records
-   * @param requestedSchema the requested schema for materialization
-   * @param fileSchema the schema of the file
-   * @param extraMetadata the app specific meta data in the file
-   * @param readSupportMetadata the read support specific metadata
+   * For compatibility only
+   * use {@link ParquetInputSplit#ParquetInputSplit(Path, long, long, long, String[], long[], String, Map)}
+   * @param path
+   * @param start
+   * @param length
+   * @param hosts
+   * @param blocks
+   * @param requestedSchema
+   * @param fileSchema
+   * @param extraMetadata
+   * @param readSupportMetadata
    */
+  @Deprecated
   public ParquetInputSplit(
       Path path,
       long start,
@@ -94,212 +87,168 @@ public class ParquetInputSplit extends FileSplit implements Writable {
       String fileSchema,
       Map<String, String> extraMetadata,
       Map<String, String> readSupportMetadata) {
-    super(path, start, length, hosts);
-    this.blocks = blocks;
-    this.requestedSchema = requestedSchema;
-    this.fileSchema = fileSchema;
-    this.extraMetadata = extraMetadata;
-    this.readSupportMetadata = readSupportMetadata;
+    this(
+        path, start, length, end(blocks), hosts,
+        offsets(blocks),
+        requestedSchema, readSupportMetadata
+        );
+  }
+
+  private static long end(List<BlockMetaData> blocks) {
+    BlockMetaData last = blocks.get(blocks.size() - 1);
+    return last.getStartingPos() + last.getCompressedSize();
+  }
+
+  private static long[] offsets(List<BlockMetaData> blocks) {
+    long[] offsets = new long[blocks.size()];
+    for (int i = 0; i < offsets.length; i++) {
+      offsets[i] = blocks.get(0).getStartingPos();
+    }
+    return offsets;
   }
 
   /**
-   * @return the block meta data
+   * @param file the path of the file for that split
+   * @param start the start offset in the file
+   * @param end the end offset in the file
+   * @param length the actual size in bytes that we expect to read
+   * @param hosts the hosts with the replicas of this data
+   * @param rowGroupOffsets the offsets of the rowgroups selected if loaded on the client
+   * @param requestedSchema the user requested schema
+   * @param readSupportMetadata metadata from the read support
    */
-  public List<BlockMetaData> getBlocks() {
-    return blocks;
+  public ParquetInputSplit(
+      Path file, long start, long end, long length, String[] hosts,
+      long[] rowGroupOffsets,
+      String requestedSchema,
+      Map<String, String> readSupportMetadata) {
+    super(file, start, length, hosts);
+    this.end = end;
+    this.rowGroupOffsets = rowGroupOffsets;
+    this.requestedSchema = requestedSchema;
+    this.readSupportMetadata = readSupportMetadata;
   }
 
   /**
    * @return the requested schema
    */
-  public String getRequestedSchema() {
+  String getRequestedSchema() {
     return requestedSchema;
   }
 
   /**
-   * @return the file schema
+   * @return the end offset of that split
    */
-  public String getFileSchema() {
-    return fileSchema;
+  long getEnd() {
+    return end;
   }
 
   /**
-   * @return app specific metadata from the file
+   * @return app specific metadata provided by the read support in the init phase
    */
-  public Map<String, String> getExtraMetadata() {
-    return extraMetadata;
+  Map<String, String> getReadSupportMetadata() {
+    return readSupportMetadata;
   }
 
   /**
-   * @return app specific metadata provided by the read support in the init phase
+   * @return the offsets of the row group selected if this has been determined on the client side
    */
-  public Map<String, String> getReadSupportMetadata() {
-    return readSupportMetadata;
+  long[] getRowGroupOffsets() {
+    return rowGroupOffsets;
+  }
+
+  @Override
+  public String toString() {
+    String hosts;
+    try{
+       hosts = Arrays.toString(getLocations());
+    } catch (Exception e) {
+      // IOException/InterruptedException could be thrown
+      hosts = "(" + e + ")";
+    }
+
+    return this.getClass().getSimpleName() + "{" +
+           "part: " + getPath()
+        + " start: " + getStart()
+        + " end: " + getEnd()
+        + " length: " + getLength()
+        + " hosts: " + hosts
+        + (rowGroupOffsets == null ? "" : (" row groups: " + Arrays.toString(rowGroupOffsets)))
+        + " requestedSchema: " +  requestedSchema
+        + " readSupportMetadata: " + readSupportMetadata
+        + "}";
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public void readFields(DataInput in) throws IOException {
+  final public void readFields(DataInput hin) throws IOException {
+    byte[] bytes = readArray(hin);
+    DataInputStream in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)));
     super.readFields(in);
-    int blocksSize = in.readInt();
-    this.blocks = new ArrayList<BlockMetaData>(blocksSize);
-    for (int i = 0; i < blocksSize; i++) {
-      blocks.add(readBlock(in));
+    this.end = in.readLong();
+    if (in.readBoolean()) {
+      this.rowGroupOffsets = new long[in.readInt()];
+      for (int i = 0; i < rowGroupOffsets.length; i++) {
+        rowGroupOffsets[i] = in.readLong();
+      }
     }
-    this.requestedSchema = decompressString(in);
-    this.fileSchema = decompressString(in);
-    this.extraMetadata = readKeyValues(in);
+    this.requestedSchema = readUTF8(in);
     this.readSupportMetadata = readKeyValues(in);
+    in.close();
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public void write(DataOutput out) throws IOException {
+  final public void write(DataOutput hout) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(new GZIPOutputStream(baos));
     super.write(out);
-    out.writeInt(blocks.size());
-    for (BlockMetaData block : blocks) {
-      writeBlock(out, block);
-    }
-    byte[] compressedSchema = compressString(requestedSchema);
-    out.writeInt(compressedSchema.length);
-    out.write(compressedSchema);
-    compressedSchema = compressString(fileSchema);
-    out.writeInt(compressedSchema.length);
-    out.write(compressedSchema);
-    writeKeyValues(out, extraMetadata);
-    writeKeyValues(out, readSupportMetadata);
-  }
-
-  byte[] compressString(String str) {
-    ByteArrayOutputStream obj = new ByteArrayOutputStream();
-    GZIPOutputStream gzip;
-    try {
-      gzip = new GZIPOutputStream(obj);
-      gzip.write(str.getBytes("UTF-8"));
-      gzip.close();
-    } catch (IOException e) {
-      // Not really sure how we can get here. I guess the best thing to do is to croak.
-      LOG.error("Unable to gzip InputSplit string " + str, e);
-      throw new RuntimeException("Unable to gzip InputSplit string", e);
-    }
-    return obj.toByteArray();
-  }
-
-  String decompressString(DataInput in) throws IOException {
-    int len = in.readInt();
-    byte[] bytes = new byte[len];
-    in.readFully(bytes);
-    return decompressString(bytes);
-  }
-
-  String decompressString(byte[] bytes) {
-    ByteArrayInputStream obj = new ByteArrayInputStream(bytes);
-    GZIPInputStream gzip = null;
-    String outStr = "";
-    try {
-      gzip = new GZIPInputStream(obj);
-      BufferedReader reader = new BufferedReader(new InputStreamReader(gzip, "UTF-8"));
-      char[] buffer = new char[1024];
-      int n = 0;
-      StringBuilder sb = new StringBuilder();
-      while (-1 != (n = reader.read(buffer))) {
-        sb.append(buffer, 0, n);
-      }
-      outStr = sb.toString();
-    } catch (IOException e) {
-      // Not really sure how we can get here. I guess the best thing to do is to croak.
-      LOG.error("Unable to uncompress InputSplit string", e);
-      throw new RuntimeException("Unable to uncompress InputSplit String", e);
-    } finally {
-      if (null != gzip) {
-        try {
-          gzip.close();
-        } catch (IOException e) {
-          LOG.error("Unable to uncompress InputSplit", e);
-          throw new RuntimeException("Unable to uncompress InputSplit String", e);
-        }
+    out.writeLong(end);
+    out.writeBoolean(rowGroupOffsets != null);
+    if (rowGroupOffsets != null) {
+      out.writeInt(rowGroupOffsets.length);
+      for (long o : rowGroupOffsets) {
+        out.writeLong(o);
       }
     }
-    return outStr;
+    writeUTF8(out, requestedSchema);
+    writeKeyValues(out, readSupportMetadata);
+    out.close();
+    writeArray(hout, baos.toByteArray());
   }
 
-  private BlockMetaData readBlock(DataInput in) throws IOException {
-    final BlockMetaData block = new BlockMetaData();
-    int size = in.readInt();
-    for (int i = 0; i < size; i++) {
-      block.addColumn(readColumn(in));
-    }
-    block.setRowCount(in.readLong());
-    block.setTotalByteSize(in.readLong());
-    if (!in.readBoolean()) {
-      block.setPath(in.readUTF().intern());
-    }
-    return block;
+  private static void writeUTF8(DataOutput out, String string) throws IOException {
+    byte[] bytes = string.getBytes(BytesUtils.UTF8);
+    writeArray(out, bytes);
   }
 
-  private void writeBlock(DataOutput out, BlockMetaData block)
-      throws IOException {
-    out.writeInt(block.getColumns().size());
-    for (ColumnChunkMetaData column : block.getColumns()) {
-      writeColumn(out, column);
-    }
-    out.writeLong(block.getRowCount());
-    out.writeLong(block.getTotalByteSize());
-    out.writeBoolean(block.getPath() == null);
-    if (block.getPath() != null) {
-      out.writeUTF(block.getPath());
-    }
+  private static String readUTF8(DataInput in) throws IOException {
+    byte[] bytes = readArray(in);
+    return new String(bytes, BytesUtils.UTF8).intern();
   }
 
-  private ColumnChunkMetaData readColumn(DataInput in)
-      throws IOException {
-    CompressionCodecName codec = CompressionCodecName.values()[in.readInt()];
-    String[] columnPath = new String[in.readInt()];
-    for (int i = 0; i < columnPath.length; i++) {
-      columnPath[i] = in.readUTF().intern();
-    }
-    PrimitiveTypeName type = PrimitiveTypeName.values()[in.readInt()];
-    int encodingsSize = in.readInt();
-    Set<Encoding> encodings = new HashSet<Encoding>(encodingsSize);
-    for (int i = 0; i < encodingsSize; i++) {
-      encodings.add(Encoding.values()[in.readInt()]);
-    }
-    IntStatistics emptyStats = new IntStatistics();
-    ColumnChunkMetaData column = ColumnChunkMetaData.get(
-        ColumnPath.get(columnPath), type, codec, encodings, emptyStats,
-        in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
-    return column;
+  private static void writeArray(DataOutput out, byte[] bytes) throws IOException {
+    out.writeInt(bytes.length);
+    out.write(bytes, 0, bytes.length);
   }
 
-  private void writeColumn(DataOutput out, ColumnChunkMetaData column)
-      throws IOException {
-    out.writeInt(column.getCodec().ordinal());
-    out.writeInt(column.getPath().size());
-    for (String s : column.getPath()) {
-      out.writeUTF(s);
-    }
-    out.writeInt(column.getType().ordinal());
-    out.writeInt(column.getEncodings().size());
-    for (Encoding encoding : column.getEncodings()) {
-      out.writeInt(encoding.ordinal());
-    }
-    out.writeLong(column.getFirstDataPageOffset());
-    out.writeLong(column.getDictionaryPageOffset());
-    out.writeLong(column.getValueCount());
-    out.writeLong(column.getTotalSize());
-    out.writeLong(column.getTotalUncompressedSize());
+  private static byte[] readArray(DataInput in) throws IOException {
+    int len = in.readInt();
+    byte[] bytes = new byte[len];
+    in.readFully(bytes);
+    return bytes;
   }
 
   private Map<String, String> readKeyValues(DataInput in) throws IOException {
     int size = in.readInt();
     Map<String, String> map = new HashMap<String, String>(size);
     for (int i = 0; i < size; i++) {
-      String key = decompressString(in).intern();
-      String value = decompressString(in).intern();
+      String key = readUTF8(in).intern();
+      String value = readUTF8(in).intern();
       map.put(key, value);
     }
     return map;
@@ -311,35 +260,10 @@ public class ParquetInputSplit extends FileSplit implements Writable {
     } else {
       out.writeInt(map.size());
       for (Entry<String, String> entry : map.entrySet()) {
-        byte[] compr = compressString(entry.getKey());
-        out.writeInt(compr.length);
-        out.write(compr);
-        compr = compressString(entry.getValue());
-        out.writeInt(compr.length);
-        out.write(compr);
+        writeUTF8(out, entry.getKey());
+        writeUTF8(out, entry.getValue());
       }
     }
   }
 
-
-  @Override
-  public String toString() {
-    String hosts[] = {};
-    try{
-       hosts = getLocations();
-    }catch(Exception ignore){} // IOException/InterruptedException could be thrown
-
-    return this.getClass().getSimpleName() + "{" +
-           "part: " + getPath()
-        + " start: " + getStart()
-        + " length: " + getLength()
-        + " hosts: " + Arrays.toString(hosts)
-        + " blocks: " + blocks.size()
-        + " requestedSchema: " + (fileSchema.equals(requestedSchema) ? "same as file" : requestedSchema)
-        + " fileSchema: " + fileSchema
-        + " extraMetadata: " + extraMetadata
-        + " readSupportMetadata: " + readSupportMetadata
-        + "}";
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
index 31917d2..940b893 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
@@ -18,7 +18,6 @@ package parquet.hadoop;
 import java.io.IOException;
 import java.util.List;
 
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
index 6703001..74f4051 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import parquet.Log;
-import parquet.Preconditions;
 import parquet.column.ParquetProperties.WriterVersion;
 import parquet.hadoop.api.WriteSupport;
 import parquet.hadoop.api.WriteSupport.WriteContext;

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/5dafd127/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
index c56a402..ec839e2 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
@@ -15,6 +15,8 @@
  */
 package parquet.hadoop;
 
+import static parquet.Preconditions.checkNotNull;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
@@ -39,8 +41,6 @@ import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.GlobalMetaData;
 import parquet.schema.MessageType;
 
-import static parquet.Preconditions.checkNotNull;
-
 /**
  * Read records from a Parquet file.
  * TODO: too many constructors (https://issues.apache.org/jira/browse/PARQUET-39)
@@ -114,7 +114,7 @@ public class ParquetReader<T> implements Closeable {
 
     FileSystem fs = file.getFileSystem(conf);
     List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
-    List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
+    List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses, false);
     this.footersIterator = footers.iterator();
     globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
     MessageType schema = globalMetaData.getSchema();