You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/02/06 00:06:23 UTC

incubator-parquet-mr git commit: PARQUET-139: Avoid reading footers when using task-side metadata

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 05adc21b1 -> ce65dfb39


PARQUET-139: Avoid reading footers when using task-side metadata

This updates the InternalParquetRecordReader to initialize the ReadContext in each task rather than once for an entire job. There are two reasons for this change:

1. For correctness, the requested projection schema must be validated against each file schema, not once using the merged schema.
2. To avoid reading file footers on the client side, which is a performance bottleneck.

Because the read context is reinitialized in every task, it is no longer necessary to pass the its contents to each task in ParquetInputSplit. The fields and accessors have been removed.

This also adds a new InputFormat, ParquetFileInputFormat that uses FileSplits instead of ParquetSplits. It goes through the normal ParquetRecordReader and creates a ParquetSplit on the task side. This is to avoid accidental behavior changes in ParquetInputFormat.

Author: Ryan Blue <bl...@apache.org>

Closes #91 from rdblue/PARQUET-139-input-format-task-side and squashes the following commits:

cb30660 [Ryan Blue] PARQUET-139: Fix deprecated reader bug from review fixes.
09cde8d [Ryan Blue] PARQUET-139: Implement changes from reviews.
3eec553 [Ryan Blue] PARQUET-139: Merge new InputFormat into ParquetInputFormat.
8971b80 [Ryan Blue] PARQUET-139: Add ParquetFileInputFormat that uses FileSplit.
87dfe86 [Ryan Blue] PARQUET-139: Expose read support helper methods.
057c7dc [Ryan Blue] PARQUET-139: Update reader to initialize read context in tasks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/ce65dfb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/ce65dfb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/ce65dfb3

Branch: refs/heads/master
Commit: ce65dfb394623c34dd7919aba5c0687f1bcf39f2
Parents: 05adc21
Author: Ryan Blue <bl...@apache.org>
Authored: Thu Feb 5 15:06:12 2015 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Thu Feb 5 15:06:12 2015 -0800

----------------------------------------------------------------------
 .../main/java/parquet/avro/AvroReadSupport.java |   4 +-
 .../hadoop/InternalParquetRecordReader.java     |  30 ++-
 .../java/parquet/hadoop/ParquetInputFormat.java | 188 ++++++-------------
 .../java/parquet/hadoop/ParquetInputSplit.java  | 101 +++-------
 .../main/java/parquet/hadoop/ParquetReader.java |  22 +--
 .../parquet/hadoop/ParquetRecordReader.java     |  28 ++-
 .../mapred/DeprecatedParquetInputFormat.java    |  32 +++-
 .../java/parquet/hadoop/TestInputFormat.java    | 163 ----------------
 .../read/ParquetRecordReaderWrapper.java        |   4 +-
 9 files changed, 168 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ce65dfb3/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
index 24eca13..c82977e 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
@@ -77,7 +77,9 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
       metadata = new LinkedHashMap<String, String>();
       metadata.put(AVRO_READ_SCHEMA_METADATA_KEY, avroReadSchema);
     }
-    return new ReadContext(schema, metadata);
+    // use getSchemaForRead because it checks that the requested schema is a
+    // subset of the columns in the file schema
+    return new ReadContext(getSchemaForRead(fileSchema, schema), metadata);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ce65dfb3/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
index 9b72d8d..8ae7c57 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
@@ -19,9 +19,13 @@
 package parquet.hadoop;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
@@ -31,6 +35,7 @@ import parquet.column.page.PageReadStore;
 import parquet.filter.UnboundRecordFilter;
 import parquet.filter2.compat.FilterCompat;
 import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.hadoop.api.InitContext;
 import parquet.hadoop.api.ReadSupport;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.util.counters.BenchmarkCounter;
@@ -155,17 +160,19 @@ class InternalParquetRecordReader<T> {
     return (float) current / total;
   }
 
-  public void initialize(MessageType requestedSchema, MessageType fileSchema,
-      Map<String, String> extraMetadata, Map<String, String> readSupportMetadata,
+  public void initialize(MessageType fileSchema,
+      Map<String, String> fileMetadata,
       Path file, List<BlockMetaData> blocks, Configuration configuration)
       throws IOException {
-    this.requestedSchema = requestedSchema;
+    // initialize a ReadContext for this file
+    ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
+        configuration, toSetMultiMap(fileMetadata), fileSchema));
+    this.requestedSchema = readContext.getRequestedSchema();
     this.fileSchema = fileSchema;
     this.file = file;
-    this.columnCount = this.requestedSchema.getPaths().size();
+    this.columnCount = requestedSchema.getPaths().size();
     this.recordConverter = readSupport.prepareForRead(
-        configuration, extraMetadata, fileSchema,
-        new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));
+        configuration, fileMetadata, fileSchema, readContext);
     this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
     List<ColumnDescriptor> columns = requestedSchema.getColumns();
     reader = new ParquetFileReader(configuration, file, blocks, columns);
@@ -223,4 +230,15 @@ class InternalParquetRecordReader<T> {
     }
     return true;
   }
+
+  private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
+    Map<K, Set<V>> setMultiMap = new HashMap<K, Set<V>>();
+    for (Map.Entry<K, V> entry : map.entrySet()) {
+      Set<V> set = new HashSet<V>();
+      set.add(entry.getValue());
+      setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set));
+    }
+    return Collections.unmodifiableMap(setMultiMap);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ce65dfb3/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
index 6b3958d..8728965 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
@@ -48,7 +48,9 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import parquet.Log;
+import parquet.Preconditions;
 import parquet.filter.UnboundRecordFilter;
 import parquet.filter2.compat.FilterCompat;
 import parquet.filter2.compat.FilterCompat.Filter;
@@ -204,17 +206,23 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
 
   private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;
 
-  private Class<?> readSupportClass;
+  private final Class<? extends ReadSupport<T>> readSupportClass;
 
   /**
    * Hadoop will instantiate using this constructor
    */
   public ParquetInputFormat() {
+    this.readSupportClass = null;
   }
 
   /**
-   * constructor used when this InputFormat in wrapped in another one (In Pig for example)
-   * @param readSupportClass the class to materialize records
+   * Constructor for subclasses, such as AvroParquetInputFormat, or wrappers.
+   * <p>
+   * Subclasses and wrappers may use this constructor to set the ReadSupport
+   * class that will be used when reading instead of requiring the user to set
+   * the read support property in their configuration.
+   *
+   * @param readSupportClass a ReadSupport subclass
    */
   public <S extends ReadSupport<T>> ParquetInputFormat(Class<S> readSupportClass) {
     this.readSupportClass = readSupportClass;
@@ -235,14 +243,35 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * @param configuration to find the configuration for the read support
    * @return the configured read support
+   * @deprecated use getReadSupportInstance static methods instead
+   */
+  @Deprecated
+  @SuppressWarnings("unchecked")
+  ReadSupport<T> getReadSupport(Configuration configuration){
+    return getReadSupportInstance(readSupportClass == null ?
+        (Class<? extends ReadSupport<T>>) getReadSupportClass(configuration) :
+        readSupportClass);
+  }
+
+  /**
+   * @param configuration to find the configuration for the read support
+   * @return the configured read support
    */
-  public ReadSupport<T> getReadSupport(Configuration configuration){
+  @SuppressWarnings("unchecked")
+  public static <T> ReadSupport<T> getReadSupportInstance(Configuration configuration){
+    return getReadSupportInstance(
+        (Class<? extends ReadSupport<T>>) getReadSupportClass(configuration));
+  }
+
+  /**
+   * @param readSupportClass to instantiate
+   * @return the configured read support
+   */
+  @SuppressWarnings("unchecked")
+  static <T> ReadSupport<T> getReadSupportInstance(
+      Class<? extends ReadSupport<T>> readSupportClass){
     try {
-      if (readSupportClass == null) {
-        // TODO: fix this weird caching independent of the conf parameter
-        readSupportClass = getReadSupportClass(configuration);
-      }
-      return (ReadSupport<T>)readSupportClass.newInstance();
+      return readSupportClass.newInstance();
     } catch (InstantiationException e) {
       throw new BadConfigurationException("could not instantiate read support class", e);
     } catch (IllegalAccessException e) {
@@ -256,7 +285,23 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   @Override
   public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
     Configuration configuration = ContextUtil.getConfiguration(jobContext);
-    return new ArrayList<InputSplit>(getSplits(configuration, getFooters(jobContext)));
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    if (isTaskSideMetaData(configuration)) {
+      // Although not required by the API, some clients may depend on always
+      // receiving ParquetInputSplit. Translation is required at some point.
+      for (InputSplit split : super.getSplits(jobContext)) {
+        Preconditions.checkArgument(split instanceof FileSplit,
+            "Cannot wrap non-FileSplit: " + split);
+        splits.add(ParquetInputSplit.from((FileSplit) split));
+      }
+      return splits;
+
+    } else {
+      splits.addAll(getSplits(configuration, getFooters(jobContext)));
+    }
+
+    return splits;
   }
 
   /**
@@ -264,9 +309,10 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
    * @param footers the footers of the files to read
    * @return the splits for the footers
    * @throws IOException
+   * @deprecated split planning using file footers will be removed
    */
+  @Deprecated
   public List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers) throws IOException {
-    boolean taskSideMetaData = isTaskSideMetaData(configuration);
     boolean strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
     final long maxSplitSize = configuration.getLong("mapred.max.split.size", Long.MAX_VALUE);
     final long minSplitSize = Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L));
@@ -279,7 +325,8 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
         globalMetaData.getKeyValueMetaData(),
         globalMetaData.getSchema()));
 
-    return SplitStrategy.getSplitStrategy(taskSideMetaData).getSplits(configuration, footers, maxSplitSize, minSplitSize, readContext);
+    return new ClientSideMetadataSplitStrategy().getSplits(
+        configuration, footers, maxSplitSize, minSplitSize, readContext);
   }
 
   /*
@@ -489,118 +536,8 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   }
 
 }
-abstract class SplitStrategy {
-  private static final Log LOG = Log.getLog(SplitStrategy.class);
 
-  static SplitStrategy getSplitStrategy(boolean taskSideMetaData) {
-    if (taskSideMetaData) {
-      LOG.info("Using Task Side Metadata Split Strategy");
-      return new TaskSideMetadataSplitStrategy();
-    } else {
-      LOG.info("Using Client Side Metadata Split Strategy");
-      return new ClientSideMetadataSplitStrategy();
-    }
-  }
-
-  abstract List<ParquetInputSplit> getSplits(
-      Configuration configuration,
-      List<Footer> footers,
-      final long maxSplitSize, final long minSplitSize,
-      ReadContext readContext) throws IOException;
-}
-class TaskSideMetadataSplitStrategy extends SplitStrategy {
-
-  @Override
-  List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
-      long maxSplitSize, long minSplitSize, ReadContext readContext) throws IOException {
-    List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
-    for (Footer footer : footers) {
-      // TODO: keep status in Footer
-      final Path file = footer.getFile();
-      FileSystem fs = file.getFileSystem(configuration);
-      FileStatus fileStatus = fs.getFileStatus(file);
-      BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-      splits.addAll(generateTaskSideMDSplits(
-          fileBlockLocations,
-          fileStatus,
-          readContext.getRequestedSchema().toString(),
-          readContext.getReadSupportMetadata(),
-          minSplitSize,
-          maxSplitSize));
-
-    }
-    return splits;
-  }
-
-  private static int findBlockIndex(BlockLocation[] hdfsBlocksArray, long endOffset) {
-    for (int i = 0; i < hdfsBlocksArray.length; i++) {
-      BlockLocation block = hdfsBlocksArray[i];
-      // end offset is exclusive. We want the block that contains the point right before.
-      if (endOffset > block.getOffset() && endOffset <= (block.getOffset() + block.getLength())) {
-        return i;
-      }
-    }
-    return -1;
-  }
-
-  static <T> List<ParquetInputSplit> generateTaskSideMDSplits(
-      BlockLocation[] hdfsBlocksArray,
-      FileStatus fileStatus,
-      String requestedSchema,
-      Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
-    if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
-      throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
-    }
-    //generate splits from rowGroups of each split
-    List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
-    // [startOffset, endOffset)
-    long startOffset = 0;
-    long endOffset = 0;
-    // they should already be sorted
-    Arrays.sort(hdfsBlocksArray, new Comparator<BlockLocation>() {
-      @Override
-      public int compare(BlockLocation o1, BlockLocation o2) {
-        return compare(o1.getOffset(), o2.getOffset());
-      }
-      private int compare(long x, long y) {
-        return (x < y) ? -1 : ((x == y) ? 0 : 1);
-      }
-    });
-    final BlockLocation lastBlock =
-        hdfsBlocksArray.length == 0 ? null : hdfsBlocksArray[hdfsBlocksArray.length - 1];
-    while (endOffset < fileStatus.getLen()) {
-      startOffset = endOffset;
-      BlockLocation blockLocation;
-      final int nextBlockMin = findBlockIndex(hdfsBlocksArray, startOffset + minSplitSize);
-      final int nextBlockMax = findBlockIndex(hdfsBlocksArray, startOffset + maxSplitSize);
-      if (nextBlockMax == nextBlockMin && nextBlockMax != -1) {
-        // no block boundary between min and max
-        // => use max for the size of the split
-        endOffset = startOffset + maxSplitSize;
-        blockLocation = hdfsBlocksArray[nextBlockMax];
-      } else if (nextBlockMin > -1) {
-        // block boundary between min and max
-        // we end the split at the first block boundary
-        blockLocation = hdfsBlocksArray[nextBlockMin];
-        endOffset = blockLocation.getOffset() + blockLocation.getLength();
-      } else {
-        // min and max after last block
-        // small last split
-        endOffset = fileStatus.getLen();
-        blockLocation = lastBlock;
-      }
-      resultSplits.add(
-          new ParquetInputSplit(
-              fileStatus.getPath(),
-              startOffset, endOffset, endOffset - startOffset,
-              blockLocation == null ? new String[0] : blockLocation.getHosts(),
-              null,
-              requestedSchema, readSupportMetadata));
-    }
-    return resultSplits;
-  }
-}
-class ClientSideMetadataSplitStrategy extends SplitStrategy {
+class ClientSideMetadataSplitStrategy {
   //Wrapper of hdfs blocks, keep track of which HDFS block is being used
   private static class HDFSBlocks {
     BlockLocation[] hdfsBlocks;
@@ -712,16 +649,13 @@ class ClientSideMetadataSplitStrategy extends SplitStrategy {
               end,
               length,
               hdfsBlock.getHosts(),
-              rowGroupOffsets,
-              requestedSchema,
-              readSupportMetadata
+              rowGroupOffsets
       );
     }
   }
 
   private static final Log LOG = Log.getLog(ClientSideMetadataSplitStrategy.class);
 
-  @Override
   List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
       long maxSplitSize, long minSplitSize, ReadContext readContext)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ce65dfb3/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
index 1be72d3..be09a3c 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
@@ -26,10 +26,8 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -38,7 +36,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
-import parquet.bytes.BytesUtils;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.schema.MessageType;
@@ -59,8 +56,6 @@ public class ParquetInputSplit extends FileSplit implements Writable {
 
   private long end;
   private long[] rowGroupOffsets;
-  private String requestedSchema;
-  private Map<String, String> readSupportMetadata;
 
   /**
    * Writables must have a parameterless constructor
@@ -71,7 +66,7 @@ public class ParquetInputSplit extends FileSplit implements Writable {
 
   /**
    * For compatibility only
-   * use {@link ParquetInputSplit#ParquetInputSplit(Path, long, long, long, String[], long[], String, Map)}
+   * use {@link ParquetInputSplit#ParquetInputSplit(Path, long, long, long, String[], long[])}
    * @param path
    * @param start
    * @param length
@@ -93,11 +88,7 @@ public class ParquetInputSplit extends FileSplit implements Writable {
       String fileSchema,
       Map<String, String> extraMetadata,
       Map<String, String> readSupportMetadata) {
-    this(
-        path, start, length, end(blocks, requestedSchema), hosts,
-        offsets(blocks),
-        requestedSchema, readSupportMetadata
-        );
+    this(path, start, length, end(blocks, requestedSchema), hosts, offsets(blocks));
   }
 
   private static long end(List<BlockMetaData> blocks, String requestedSchema) {
@@ -124,32 +115,46 @@ public class ParquetInputSplit extends FileSplit implements Writable {
   }
 
   /**
+   * Builds a {@code ParquetInputSplit} from a mapreduce {@link FileSplit}.
+   *
+   * @param split a mapreduce FileSplit
+   * @return a ParquetInputSplit
+   * @throws IOException
+   */
+  static ParquetInputSplit from(FileSplit split) throws IOException {
+    return new ParquetInputSplit(split.getPath(),
+        split.getStart(), split.getStart() + split.getLength(),
+        split.getLength(), split.getLocations(), null);
+  }
+
+  /**
+   * Builds a {@code ParquetInputSplit} from a mapred
+   * {@link org.apache.hadoop.mapred.FileSplit}.
+   *
+   * @param split a mapreduce FileSplit
+   * @return a ParquetInputSplit
+   * @throws IOException
+   */
+  static ParquetInputSplit from(org.apache.hadoop.mapred.FileSplit split) throws IOException {
+    return new ParquetInputSplit(split.getPath(),
+        split.getStart(), split.getStart() + split.getLength(),
+        split.getLength(), split.getLocations(), null);
+  }
+
+  /**
    * @param file the path of the file for that split
    * @param start the start offset in the file
    * @param end the end offset in the file
    * @param length the actual size in bytes that we expect to read
    * @param hosts the hosts with the replicas of this data
    * @param rowGroupOffsets the offsets of the rowgroups selected if loaded on the client
-   * @param requestedSchema the user requested schema
-   * @param readSupportMetadata metadata from the read support
    */
   public ParquetInputSplit(
       Path file, long start, long end, long length, String[] hosts,
-      long[] rowGroupOffsets,
-      String requestedSchema,
-      Map<String, String> readSupportMetadata) {
+      long[] rowGroupOffsets) {
     super(file, start, length, hosts);
     this.end = end;
     this.rowGroupOffsets = rowGroupOffsets;
-    this.requestedSchema = requestedSchema;
-    this.readSupportMetadata = readSupportMetadata;
-  }
-
-  /**
-   * @return the requested schema
-   */
-  public String getRequestedSchema() {
-    return requestedSchema;
   }
 
   /**
@@ -160,13 +165,6 @@ public class ParquetInputSplit extends FileSplit implements Writable {
   }
 
   /**
-   * @return app specific metadata provided by the read support in the init phase
-   */
-  public Map<String, String> getReadSupportMetadata() {
-    return readSupportMetadata;
-  }
-
-  /**
    * @return the offsets of the row group selected if this has been determined on the client side
    */
   public long[] getRowGroupOffsets() {
@@ -190,8 +188,6 @@ public class ParquetInputSplit extends FileSplit implements Writable {
         + " length: " + getLength()
         + " hosts: " + hosts
         + (rowGroupOffsets == null ? "" : (" row groups: " + Arrays.toString(rowGroupOffsets)))
-        + " requestedSchema: " +  requestedSchema
-        + " readSupportMetadata: " + readSupportMetadata
         + "}";
   }
 
@@ -210,8 +206,6 @@ public class ParquetInputSplit extends FileSplit implements Writable {
         rowGroupOffsets[i] = in.readLong();
       }
     }
-    this.requestedSchema = readUTF8(in);
-    this.readSupportMetadata = readKeyValues(in);
     in.close();
   }
 
@@ -231,22 +225,10 @@ public class ParquetInputSplit extends FileSplit implements Writable {
         out.writeLong(o);
       }
     }
-    writeUTF8(out, requestedSchema);
-    writeKeyValues(out, readSupportMetadata);
     out.close();
     writeArray(hout, baos.toByteArray());
   }
 
-  private static void writeUTF8(DataOutput out, String string) throws IOException {
-    byte[] bytes = string.getBytes(BytesUtils.UTF8);
-    writeArray(out, bytes);
-  }
-
-  private static String readUTF8(DataInput in) throws IOException {
-    byte[] bytes = readArray(in);
-    return new String(bytes, BytesUtils.UTF8).intern();
-  }
-
   private static void writeArray(DataOutput out, byte[] bytes) throws IOException {
     out.writeInt(bytes.length);
     out.write(bytes, 0, bytes.length);
@@ -259,27 +241,4 @@ public class ParquetInputSplit extends FileSplit implements Writable {
     return bytes;
   }
 
-  private Map<String, String> readKeyValues(DataInput in) throws IOException {
-    int size = in.readInt();
-    Map<String, String> map = new HashMap<String, String>(size);
-    for (int i = 0; i < size; i++) {
-      String key = readUTF8(in).intern();
-      String value = readUTF8(in).intern();
-      map.put(key, value);
-    }
-    return map;
-  }
-
-  private void writeKeyValues(DataOutput out, Map<String, String> map) throws IOException {
-    if (map == null) {
-      out.writeInt(0);
-    } else {
-      out.writeInt(map.size());
-      for (Entry<String, String> entry : map.entrySet()) {
-        writeUTF8(out, entry.getKey());
-        writeUTF8(out, entry.getValue());
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ce65dfb3/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
index acb4984..4231ae4 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
@@ -25,8 +25,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -38,11 +36,8 @@ import parquet.filter.UnboundRecordFilter;
 import parquet.filter2.compat.FilterCompat;
 import parquet.filter2.compat.FilterCompat.Filter;
 import parquet.filter2.compat.RowGroupFilter;
-import parquet.hadoop.api.InitContext;
 import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.api.ReadSupport.ReadContext;
 import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
 import parquet.schema.MessageType;
 
 /**
@@ -53,9 +48,7 @@ public class ParquetReader<T> implements Closeable {
 
   private final ReadSupport<T> readSupport;
   private final Configuration conf;
-  private final ReadContext readContext;
   private final Iterator<Footer> footersIterator;
-  private final GlobalMetaData globalMetaData;
   private final Filter filter;
 
   private InternalParquetRecordReader<T> reader;
@@ -125,10 +118,6 @@ public class ParquetReader<T> implements Closeable {
     }));
     List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses, false);
     this.footersIterator = footers.iterator();
-    globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
-    MessageType schema = globalMetaData.getSchema();
-    Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
-    readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
   }
 
   /**
@@ -158,12 +147,15 @@ public class ParquetReader<T> implements Closeable {
 
       List<BlockMetaData> blocks = footer.getParquetMetadata().getBlocks();
 
-      List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, footer.getParquetMetadata().getFileMetaData().getSchema());
+      MessageType fileSchema = footer.getParquetMetadata().getFileMetaData().getSchema();
+
+      List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(
+          filter, blocks, fileSchema);
 
       reader = new InternalParquetRecordReader<T>(readSupport, filter);
-      reader.initialize(
-          readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
-          readContext.getReadSupportMetadata(), footer.getFile(), filteredBlocks, conf);
+      reader.initialize(fileSchema,
+          footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
+          footer.getFile(), filteredBlocks, conf);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ce65dfb3/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
index 38c180d..abf65c1 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
@@ -40,18 +40,17 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import parquet.Log;
 import parquet.filter.UnboundRecordFilter;
 import parquet.filter2.compat.FilterCompat;
 import parquet.filter2.compat.FilterCompat.Filter;
-import parquet.filter2.compat.RowGroupFilter;
 import parquet.hadoop.api.ReadSupport;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.hadoop.util.ContextUtil;
 import parquet.hadoop.util.counters.BenchmarkCounter;
 import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
 
 /**
  * Reads the records from a block of a Parquet file
@@ -138,13 +137,13 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
               + context.getClass().getCanonicalName());
     }
 
-    initializeInternalReader((ParquetInputSplit)inputSplit, ContextUtil.getConfiguration(context));
+    initializeInternalReader(toParquetSplit(inputSplit), ContextUtil.getConfiguration(context));
   }
 
   public void initialize(InputSplit inputSplit, Configuration configuration, Reporter reporter)
       throws IOException, InterruptedException {
     BenchmarkCounter.initCounterFromReporter(reporter,configuration);
-    initializeInternalReader((ParquetInputSplit) inputSplit, configuration);
+    initializeInternalReader(toParquetSplit(inputSplit), configuration);
   }
 
   private void initializeInternalReader(ParquetInputSplit split, Configuration configuration) throws IOException {
@@ -189,14 +188,9 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
       }
     }
     MessageType fileSchema = footer.getFileMetaData().getSchema();
-    MessageType requestedSchema = MessageTypeParser.parseMessageType(split.getRequestedSchema());
     Map<String, String> fileMetaData = footer.getFileMetaData().getKeyValueMetaData();
-    Map<String, String> readSupportMetadata = split.getReadSupportMetadata();
     internalReader.initialize(
-        requestedSchema, fileSchema,
-        fileMetaData, readSupportMetadata,
-        path,
-        filteredBlocks, configuration);
+        fileSchema, fileMetaData, path, filteredBlocks, configuration);
   }
 
   /**
@@ -206,4 +200,18 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
   public boolean nextKeyValue() throws IOException, InterruptedException {
     return internalReader.nextKeyValue();
   }
+
+  private ParquetInputSplit toParquetSplit(InputSplit split) throws IOException {
+    if (split instanceof ParquetInputSplit) {
+      return (ParquetInputSplit) split;
+    } else if (split instanceof FileSplit) {
+      return ParquetInputSplit.from((FileSplit) split);
+    } else if (split instanceof org.apache.hadoop.mapred.FileSplit) {
+      return ParquetInputSplit.from(
+          (org.apache.hadoop.mapred.FileSplit) split);
+    } else {
+      throw new IllegalArgumentException(
+          "Invalid split (not a FileSplit or ParquetInputSplit): " + split);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ce65dfb3/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
index be6d4f9..2617376 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
@@ -18,6 +18,7 @@
  */
 package parquet.hadoop.mapred;
 
+import static java.lang.Boolean.TRUE;
 import static java.util.Arrays.asList;
 
 import java.io.DataInput;
@@ -25,6 +26,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -42,11 +44,15 @@ public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.Fi
   @Override
   public RecordReader<Void, Container<V>> getRecordReader(InputSplit split, JobConf job,
                   Reporter reporter) throws IOException {
-    return new RecordReaderWrapper<V>(realInputFormat, split, job, reporter);
+    return new RecordReaderWrapper<V>(split, job, reporter);
   }
 
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    if (isTaskSideMetaData(job)) {
+      return super.getSplits(job, numSplits);
+    }
+
     List<Footer> footers = getFooters(job);
     List<ParquetInputSplit> splits = realInputFormat.getSplits(job, footers);
     if (splits == null) {
@@ -74,18 +80,24 @@ public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.Fi
     private boolean firstRecord = false;
     private boolean eof = false;
 
-    public RecordReaderWrapper(ParquetInputFormat<V> newInputFormat,
-                               InputSplit oldSplit,
-                               JobConf oldJobConf,
-                               Reporter reporter) throws IOException {
-
+    public RecordReaderWrapper(
+        InputSplit oldSplit, JobConf oldJobConf, Reporter reporter)
+        throws IOException {
       splitLen = oldSplit.getLength();
 
       try {
-        realReader = new ParquetRecordReader<V>(newInputFormat.getReadSupport(oldJobConf),
+        realReader = new ParquetRecordReader<V>(
+            ParquetInputFormat.<V>getReadSupportInstance(oldJobConf),
             ParquetInputFormat.getFilter(oldJobConf));
 
-        realReader.initialize(((ParquetInputSplitWrapper)oldSplit).realSplit, oldJobConf, reporter);
+        if (oldSplit instanceof ParquetInputSplitWrapper) {
+          realReader.initialize(((ParquetInputSplitWrapper) oldSplit).realSplit, oldJobConf, reporter);
+        } else if (oldSplit instanceof FileSplit) {
+          realReader.initialize((FileSplit) oldSplit, oldJobConf, reporter);
+        } else {
+          throw new IllegalArgumentException(
+              "Invalid split (not a FileSplit or ParquetInputSplitWrapper): " + oldSplit);
+        }
 
         // read once to gain access to key and value objects
         if (realReader.nextKeyValue()) {
@@ -157,6 +169,10 @@ public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.Fi
     }
   }
 
+  public static boolean isTaskSideMetaData(JobConf job) {
+    return job.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, TRUE);
+  }
+
   private static class ParquetInputSplitWrapper implements InputSplit {
 
     ParquetInputSplit realSplit;

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ce65dfb3/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
index 369ca13..69deeef 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
@@ -35,7 +35,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -108,17 +107,6 @@ public class TestInputFormat {
   }
 
   @Test
-  public void testThrowExceptionWhenMaxSplitSizeIsSmallerThanMinSplitSizeTaskSide() throws IOException {
-    try {
-      generateTSSplitByMinMaxSize(50, 49);
-      fail("should throw exception when max split size is smaller than the min split size");
-    } catch (ParquetDecodingException e) {
-      assertEquals("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = 49; minSplitSize is 50"
-              , e.getMessage());
-    }
-  }
-
-  @Test
   public void testThrowExceptionWhenMaxSplitSizeIsNegative() throws IOException {
     try {
       generateSplitByMinMaxSize(-100, -50);
@@ -130,17 +118,6 @@ public class TestInputFormat {
   }
 
   @Test
-  public void testTSThrowExceptionWhenMaxSplitSizeIsNegative() throws IOException {
-    try {
-      generateTSSplitByMinMaxSize(-100, -50);
-      fail("should throw exception when max split size is negative");
-    } catch (ParquetDecodingException e) {
-      assertEquals("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = -50; minSplitSize is -100"
-              , e.getMessage());
-    }
-  }
-
-  @Test
   public void testGetFilter() throws IOException {
     IntColumn intColumn = intColumn("foo");
     FilterPredicate p = or(eq(intColumn, 7), eq(intColumn, 12));
@@ -178,21 +155,6 @@ public class TestInputFormat {
   }
 
   @Test
-  public void testTSGenerateSplitsAlignedWithHDFSBlock() throws IOException {
-    withHDFSBlockSize(50, 50);
-    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(50, 50);
-    shouldSplitStartBe(splits, 0, 50);
-    shouldSplitLocationBe(splits, 0, 1);
-    shouldSplitLengthBe(splits, 50, 50);
-
-    splits = generateSplitByMinMaxSize(0, Long.MAX_VALUE);
-    shouldSplitStartBe(splits, 0, 50);
-    shouldSplitLocationBe(splits, 0, 1);
-    shouldSplitLengthBe(splits, 50, 50);
-
-  }
-
-  @Test
   public void testRowGroupNotAlignToHDFSBlock() throws IOException {
     //Test HDFS blocks size(51) is not multiple of row group size(10)
     withHDFSBlockSize(51, 51);
@@ -221,32 +183,6 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 40, 50, 10);
   }
 
-  @Test
-  public void testTSRowGroupNotAlignToHDFSBlock() throws IOException {
-    //Test HDFS blocks size(51) is not multiple of row group size(10)
-    withHDFSBlockSize(51, 51);
-    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(50, 50);
-    shouldSplitStartBe(splits, 0, 50, 100);
-    shouldSplitLocationBe(splits, 0, 1, 1);
-    shouldSplitLengthBe(splits, 50, 50, 2);
-
-    //Test a rowgroup is greater than the hdfsBlock boundary
-    withHDFSBlockSize(49, 49);
-    splits = generateTSSplitByMinMaxSize(50, 50);
-    shouldSplitStartBe(splits, 0, 50);
-    shouldSplitLocationBe(splits, 1, 1);
-    shouldSplitLengthBe(splits, 50, 48);
-
-    /*
-    aaaa bbbbb c
-     */
-    withHDFSBlockSize(44,44,44);
-    splits = generateTSSplitByMinMaxSize(40, 50);
-    shouldSplitStartBe(splits, 0, 44, 88);
-    shouldSplitLocationBe(splits, 0, 1, 2);
-    shouldSplitLengthBe(splits, 44, 44, 44);
-  }
-
   /*
     when min size is 55, max size is 56, the first split will be generated with 6 row groups(size of 10 each), which satisfies split.size>min.size, but not split.size<max.size
     aaaaa abbbb
@@ -273,28 +209,6 @@ public class TestInputFormat {
 
   }
 
-  @Test
-  public void testTSGenerateSplitsNotAlignedWithHDFSBlock() throws IOException, InterruptedException {
-    withHDFSBlockSize(50, 50);
-    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(55, 56);
-    shouldSplitStartBe(splits, 0, 56);
-    shouldSplitLocationBe(splits, 1, 1);
-    shouldSplitLengthBe(splits, 56, 44);
-
-    withHDFSBlockSize(51, 51);
-    splits = generateTSSplitByMinMaxSize(55, 56);
-    shouldSplitStartBe(splits, 0, 56);
-    shouldSplitLocationBe(splits, 1, 1);
-    shouldSplitLengthBe(splits, 56, 46);
-
-    withHDFSBlockSize(49, 49, 49);
-    splits = generateTSSplitByMinMaxSize(55, 56);
-    shouldSplitStartBe(splits, 0, 56, 112);
-    shouldSplitLocationBe(splits, 1, 2, 2);
-    shouldSplitLengthBe(splits, 56, 56, 35);
-
-  }
-
   /*
     when the max size is set to be 30, first split will be of size 30,
     and when creating second split, it will try to align it to second hdfsBlock, and therefore generates a split of size 20
@@ -327,33 +241,6 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 30, 20, 30, 20);
   }
 
-  @Test
-  public void testTSGenerateSplitsSmallerThanMaxSizeAndAlignToHDFS() throws Exception {
-    withHDFSBlockSize(50, 50);
-    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(18, 30);
-    shouldSplitStartBe(splits, 0, 30, 50, 80);
-    shouldSplitLocationBe(splits, 0, 0, 1, 1);
-    shouldSplitLengthBe(splits, 30, 20, 30, 20);
-
-    /*
-    aaabb cccdd
-         */
-    withHDFSBlockSize(51, 51);
-    splits = generateTSSplitByMinMaxSize(18, 30);
-    shouldSplitStartBe(splits, 0, 30, 51, 81);
-    shouldSplitLocationBe(splits, 0, 0, 1, 1);
-    shouldSplitLengthBe(splits, 30, 21, 30, 21);
-
-    /*
-    aaabb cccdd
-     */
-    withHDFSBlockSize(49, 49, 49);
-    splits = generateTSSplitByMinMaxSize(18, 30);
-    shouldSplitStartBe(splits, 0, 30, 49, 79, 98, 128);
-    shouldSplitLocationBe(splits, 0, 0, 1, 1, 2, 2);
-    shouldSplitLengthBe(splits, 30, 19, 30, 19, 30, 19);
-  }
-
   /*
     when the min size is set to be 25, so the second split can not be aligned with the boundary of hdfs block, there for split of size 30 will be created as the 3rd split.
     aaabb bcccd
@@ -367,15 +254,6 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 30, 30, 30, 10);
   }
 
-  @Test
-  public void testTSGenerateSplitsCrossHDFSBlockBoundaryToSatisfyMinSize() throws Exception {
-    withHDFSBlockSize(50, 50);
-    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(25, 30);
-    shouldSplitStartBe(splits, 0, 30, 60, 90);
-    shouldSplitLocationBe(splits, 0, 1, 1, 1);
-    shouldSplitLengthBe(splits, 30, 30, 30, 10);
-  }
-
   /*
     when rowGroups size is 10, but min split size is 10, max split size is 18, it will create splits of size 20 and of size 10 and align with hdfsBlocks
     aabbc ddeef
@@ -411,37 +289,6 @@ public class TestInputFormat {
     shouldSplitLengthBe(splits, 20, 20, 10, 20, 20, 10);
   }
 
-  @Test
-  public void testTSMultipleRowGroupsInABlockToAlignHDFSBlock() throws Exception {
-    withHDFSBlockSize(50, 50);
-    List<ParquetInputSplit> splits = generateTSSplitByMinMaxSize(10, 18);
-    shouldSplitStartBe(splits, 0, 18, 36, 50, 68, 86);
-    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
-    shouldSplitLengthBe(splits, 18, 18, 14, 18, 18, 14);
-
-    /*
-    aabbc ddeef
-    notice the first byte of split d is in the first hdfs block:
-    when adding the 6th row group, although the first byte of it is in the first hdfs block
-    , but the mid point of the row group is in the second hdfs block, there for a new split(d) is created including that row group
-     */
-    withHDFSBlockSize(51, 51);
-    splits = generateTSSplitByMinMaxSize(10, 18);
-    shouldSplitStartBe(splits, 0, 18, 36, 51, 69, 87);
-    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
-    shouldSplitLengthBe(splits, 18, 18, 15, 18, 18, 15);
-
-    /*
-    aabbc ddeef
-    same as the case where block sizes are 50 50
-     */
-    withHDFSBlockSize(49, 49);
-    splits = generateTSSplitByMinMaxSize(10, 18);
-    shouldSplitStartBe(splits, 0, 18, 36, 49, 67, 85);
-    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
-    shouldSplitLengthBe(splits, 18, 18, 13, 18, 18, 13);
-  }
-
   public static final class DummyUnboundRecordFilter implements UnboundRecordFilter {
     @Override
     public RecordFilter bind(Iterable<ColumnReader> readers) {
@@ -564,15 +411,6 @@ public class TestInputFormat {
         min, max);
   }
 
-  private List<ParquetInputSplit> generateTSSplitByMinMaxSize(long min, long max) throws IOException {
-    return TaskSideMetadataSplitStrategy.generateTaskSideMDSplits(
-        hdfsBlocks,
-        fileStatus,
-        schema.toString(),
-        extramd,
-        min, max);
-  }
-
   private List<ParquetInputSplit> generateSplitByDeprecatedConstructor(long min, long max) throws
       IOException {
     List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
@@ -611,7 +449,6 @@ public class TestInputFormat {
     for (int i = 0; i < locations.length; i++) {
       int loc = locations[i];
       ParquetInputSplit split = splits.get(i);
-      assertEquals(message(splits) + i, "foo", split.getReadSupportMetadata().get("specific"));
       assertEquals(message(splits) + i, "[foo" + loc + ".datanode, bar" + loc + ".datanode]", Arrays.toString(split.getLocations()));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ce65dfb3/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
----------------------------------------------------------------------
diff --git a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
index 2ca4ff4..4ff97ea 100644
--- a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
+++ b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
@@ -220,9 +220,7 @@ public class ParquetRecordReaderWrapper  implements RecordReader<Void, ArrayWrit
                 splitStart + splitLength,
                 splitLength,
                 fileSplit.getLocations(),
-                null,
-                readContext.getRequestedSchema().toString(),
-                readContext.getReadSupportMetadata());
+                null);
     } else {
       throw new IllegalArgumentException("Unknown split type: " + oldSplit);
     }