You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:05 UTC

[08/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
deleted file mode 100644
index 6b7b9b0..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
+++ /dev/null
@@ -1,778 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static java.lang.Boolean.TRUE;
-import static parquet.Preconditions.checkArgument;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import parquet.Log;
-import parquet.Preconditions;
-import parquet.filter.UnboundRecordFilter;
-import parquet.filter2.compat.FilterCompat;
-import parquet.filter2.compat.FilterCompat.Filter;
-import parquet.filter2.compat.RowGroupFilter;
-import parquet.filter2.predicate.FilterPredicate;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.api.ReadSupport.ReadContext;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.hadoop.util.ConfigurationUtil;
-import parquet.hadoop.util.ContextUtil;
-import parquet.hadoop.util.HiddenFileFilter;
-import parquet.hadoop.util.SerializationUtil;
-import parquet.io.ParquetDecodingException;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
-/**
- * The input format to read a Parquet file.
- *
- * It requires an implementation of {@link ReadSupport} to materialize the records.
- *
- * The requestedSchema will control how the original records get projected by the loader.
- * It must be a subset of the original schema. Only the columns needed to reconstruct the records with the requestedSchema will be scanned.
- *
- * @see #READ_SUPPORT_CLASS
- * @see #UNBOUND_RECORD_FILTER
- * @see #STRICT_TYPE_CHECKING
- * @see #FILTER_PREDICATE
- * @see #TASK_SIDE_METADATA
- *
- * @author Julien Le Dem
- *
- * @param <T> the type of the materialized records
- */
-public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
-
-  private static final Log LOG = Log.getLog(ParquetInputFormat.class);
-
-  /**
-   * key to configure the ReadSupport implementation
-   */
-  public static final String READ_SUPPORT_CLASS = "parquet.read.support.class";
-
-  /**
-   * key to configure the filter
-   */
-  public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter";
-
-  /**
-   * key to configure type checking for conflicting schemas (default: true)
-   */
-  public static final String STRICT_TYPE_CHECKING = "parquet.strict.typing";
-
-  /**
-   * key to configure the filter predicate
-   */
-  public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";
-
-  /**
-   * key to turn on or off task side metadata loading (default true)
-   * if true then metadata is read on the task side and some tasks may finish immediately.
-   * if false metadata is read on the client which is slower if there is a lot of metadata but tasks will only be spawn if there is work to do.
-   */
-  public static final String TASK_SIDE_METADATA = "parquet.task.side.metadata";
-
-  private static final int MIN_FOOTER_CACHE_SIZE = 100;
-
-  public static void setTaskSideMetaData(Job job,  boolean taskSideMetadata) {
-    ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata);
-  }
-
-  public static boolean isTaskSideMetaData(Configuration configuration) {
-    return configuration.getBoolean(TASK_SIDE_METADATA, TRUE);
-  }
-
-  public static void setReadSupportClass(Job job,  Class<?> readSupportClass) {
-    ContextUtil.getConfiguration(job).set(READ_SUPPORT_CLASS, readSupportClass.getName());
-  }
-
-  public static void setUnboundRecordFilter(Job job, Class<? extends UnboundRecordFilter> filterClass) {
-    Configuration conf = ContextUtil.getConfiguration(job);
-    checkArgument(getFilterPredicate(conf) == null,
-        "You cannot provide an UnboundRecordFilter after providing a FilterPredicate");
-
-    conf.set(UNBOUND_RECORD_FILTER, filterClass.getName());
-  }
-
-  /**
-   * @deprecated use {@link #getFilter(Configuration)}
-   */
-  @Deprecated
-  public static Class<?> getUnboundRecordFilter(Configuration configuration) {
-    return ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
-  }
-
-  private static UnboundRecordFilter getUnboundRecordFilterInstance(Configuration configuration) {
-    Class<?> clazz = ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
-    if (clazz == null) { return null; }
-
-    try {
-      UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter) clazz.newInstance();
-
-      if (unboundRecordFilter instanceof Configurable) {
-        ((Configurable)unboundRecordFilter).setConf(configuration);
-      }
-
-      return unboundRecordFilter;
-    } catch (InstantiationException e) {
-      throw new BadConfigurationException("could not instantiate unbound record filter class", e);
-    } catch (IllegalAccessException e) {
-      throw new BadConfigurationException("could not instantiate unbound record filter class", e);
-    }
-  }
-
-  public static void setReadSupportClass(JobConf conf, Class<?> readSupportClass) {
-    conf.set(READ_SUPPORT_CLASS, readSupportClass.getName());
-  }
-
-  public static Class<?> getReadSupportClass(Configuration configuration) {
-    return ConfigurationUtil.getClassFromConfig(configuration, READ_SUPPORT_CLASS, ReadSupport.class);
-  }
-
-  public static void setFilterPredicate(Configuration configuration, FilterPredicate filterPredicate) {
-    checkArgument(getUnboundRecordFilter(configuration) == null,
-        "You cannot provide a FilterPredicate after providing an UnboundRecordFilter");
-
-    configuration.set(FILTER_PREDICATE + ".human.readable", filterPredicate.toString());
-    try {
-      SerializationUtil.writeObjectToConfAsBase64(FILTER_PREDICATE, filterPredicate, configuration);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private static FilterPredicate getFilterPredicate(Configuration configuration) {
-    try {
-      return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE, configuration);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Returns a non-null Filter, which is a wrapper around either a
-   * FilterPredicate, an UnboundRecordFilter, or a no-op filter.
-   */
-  public static Filter getFilter(Configuration conf) {
-    return FilterCompat.get(getFilterPredicate(conf), getUnboundRecordFilterInstance(conf));
-  }
-
-  private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;
-
-  private final Class<? extends ReadSupport<T>> readSupportClass;
-
-  /**
-   * Hadoop will instantiate using this constructor
-   */
-  public ParquetInputFormat() {
-    this.readSupportClass = null;
-  }
-
-  /**
-   * Constructor for subclasses, such as AvroParquetInputFormat, or wrappers.
-   * <p>
-   * Subclasses and wrappers may use this constructor to set the ReadSupport
-   * class that will be used when reading instead of requiring the user to set
-   * the read support property in their configuration.
-   *
-   * @param readSupportClass a ReadSupport subclass
-   */
-  public <S extends ReadSupport<T>> ParquetInputFormat(Class<S> readSupportClass) {
-    this.readSupportClass = readSupportClass;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public RecordReader<Void, T> createRecordReader(
-      InputSplit inputSplit,
-      TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-    Configuration conf = ContextUtil.getConfiguration(taskAttemptContext);
-    ReadSupport<T> readSupport = getReadSupport(conf);
-    return new ParquetRecordReader<T>(readSupport, getFilter(conf));
-  }
-
-  /**
-   * @param configuration to find the configuration for the read support
-   * @return the configured read support
-   * @deprecated use getReadSupportInstance static methods instead
-   */
-  @Deprecated
-  @SuppressWarnings("unchecked")
-  ReadSupport<T> getReadSupport(Configuration configuration){
-    return getReadSupportInstance(readSupportClass == null ?
-        (Class<? extends ReadSupport<T>>) getReadSupportClass(configuration) :
-        readSupportClass);
-  }
-
-  /**
-   * @param configuration to find the configuration for the read support
-   * @return the configured read support
-   */
-  @SuppressWarnings("unchecked")
-  public static <T> ReadSupport<T> getReadSupportInstance(Configuration configuration){
-    return getReadSupportInstance(
-        (Class<? extends ReadSupport<T>>) getReadSupportClass(configuration));
-  }
-
-  /**
-   * @param readSupportClass to instantiate
-   * @return the configured read support
-   */
-  @SuppressWarnings("unchecked")
-  static <T> ReadSupport<T> getReadSupportInstance(
-      Class<? extends ReadSupport<T>> readSupportClass){
-    try {
-      return readSupportClass.newInstance();
-    } catch (InstantiationException e) {
-      throw new BadConfigurationException("could not instantiate read support class", e);
-    } catch (IllegalAccessException e) {
-      throw new BadConfigurationException("could not instantiate read support class", e);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
-    Configuration configuration = ContextUtil.getConfiguration(jobContext);
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-
-    if (isTaskSideMetaData(configuration)) {
-      // Although not required by the API, some clients may depend on always
-      // receiving ParquetInputSplit. Translation is required at some point.
-      for (InputSplit split : super.getSplits(jobContext)) {
-        Preconditions.checkArgument(split instanceof FileSplit,
-            "Cannot wrap non-FileSplit: " + split);
-        splits.add(ParquetInputSplit.from((FileSplit) split));
-      }
-      return splits;
-
-    } else {
-      splits.addAll(getSplits(configuration, getFooters(jobContext)));
-    }
-
-    return splits;
-  }
-
-  /**
-   * @param configuration the configuration to connect to the file system
-   * @param footers the footers of the files to read
-   * @return the splits for the footers
-   * @throws IOException
-   * @deprecated split planning using file footers will be removed
-   */
-  @Deprecated
-  public List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers) throws IOException {
-    boolean strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
-    final long maxSplitSize = configuration.getLong("mapred.max.split.size", Long.MAX_VALUE);
-    final long minSplitSize = Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L));
-    if (maxSplitSize < 0 || minSplitSize < 0) {
-      throw new ParquetDecodingException("maxSplitSize or minSplitSize should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
-    }
-    GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, strictTypeChecking);
-    ReadContext readContext = getReadSupport(configuration).init(new InitContext(
-        configuration,
-        globalMetaData.getKeyValueMetaData(),
-        globalMetaData.getSchema()));
-
-    return new ClientSideMetadataSplitStrategy().getSplits(
-        configuration, footers, maxSplitSize, minSplitSize, readContext);
-  }
-
-  /*
-   * This is to support multi-level/recursive directory listing until
-   * MAPREDUCE-1577 is fixed.
-   */
-  @Override
-  protected List<FileStatus> listStatus(JobContext jobContext) throws IOException {
-    return getAllFileRecursively(super.listStatus(jobContext),
-       ContextUtil.getConfiguration(jobContext));
-  }
-
-  private static List<FileStatus> getAllFileRecursively(
-      List<FileStatus> files, Configuration conf) throws IOException {
-    List<FileStatus> result = new ArrayList<FileStatus>();
-    for (FileStatus file : files) {
-      if (file.isDir()) {
-        Path p = file.getPath();
-        FileSystem fs = p.getFileSystem(conf);
-        staticAddInputPathRecursively(result, fs, p, HiddenFileFilter.INSTANCE);
-      } else {
-        result.add(file);
-      }
-    }
-    LOG.info("Total input paths to process : " + result.size());
-    return result;
-  }
-
-  private static void staticAddInputPathRecursively(List<FileStatus> result,
-      FileSystem fs, Path path, PathFilter inputFilter)
-          throws IOException {
-    for (FileStatus stat: fs.listStatus(path, inputFilter)) {
-      if (stat.isDir()) {
-        staticAddInputPathRecursively(result, fs, stat.getPath(), inputFilter);
-      } else {
-        result.add(stat);
-      }
-    }
-  }
-
-  /**
-   * @param jobContext the current job context
-   * @return the footers for the files
-   * @throws IOException
-   */
-  public List<Footer> getFooters(JobContext jobContext) throws IOException {
-    List<FileStatus> statuses = listStatus(jobContext);
-    if (statuses.isEmpty()) {
-      return Collections.emptyList();
-    }
-    Configuration config = ContextUtil.getConfiguration(jobContext);
-    List<Footer> footers = new ArrayList<Footer>(statuses.size());
-    Set<FileStatus> missingStatuses = new HashSet<FileStatus>();
-    Map<Path, FileStatusWrapper> missingStatusesMap =
-            new HashMap<Path, FileStatusWrapper>(missingStatuses.size());
-
-    if (footersCache == null) {
-      footersCache =
-              new LruCache<FileStatusWrapper, FootersCacheValue>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
-    }
-    for (FileStatus status : statuses) {
-      FileStatusWrapper statusWrapper = new FileStatusWrapper(status);
-      FootersCacheValue cacheEntry =
-              footersCache.getCurrentValue(statusWrapper);
-      if (Log.DEBUG) {
-        LOG.debug("Cache entry " + (cacheEntry == null ? "not " : "")
-                + " found for '" + status.getPath() + "'");
-      }
-      if (cacheEntry != null) {
-        footers.add(cacheEntry.getFooter());
-      } else {
-        missingStatuses.add(status);
-        missingStatusesMap.put(status.getPath(), statusWrapper);
-      }
-    }
-    if (Log.DEBUG) {
-      LOG.debug("found " + footers.size() + " footers in cache and adding up "
-              + "to " + missingStatuses.size() + " missing footers to the cache");
-    }
-
-
-    if (missingStatuses.isEmpty()) {
-      return footers;
-    }
-
-    List<Footer> newFooters = getFooters(config, missingStatuses);
-    for (Footer newFooter : newFooters) {
-      // Use the original file status objects to make sure we store a
-      // conservative (older) modification time (i.e. in case the files and
-      // footers were modified and it's not clear which version of the footers
-      // we have)
-      FileStatusWrapper fileStatus = missingStatusesMap.get(newFooter.getFile());
-      footersCache.put(fileStatus, new FootersCacheValue(fileStatus, newFooter));
-    }
-
-    footers.addAll(newFooters);
-    return footers;
-  }
-
-  public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
-    return getFooters(configuration, (Collection<FileStatus>)statuses);
-  }
-
-  /**
-   * the footers for the files
-   * @param configuration to connect to the file system
-   * @param statuses the files to open
-   * @return the footers of the files
-   * @throws IOException
-   */
-  public List<Footer> getFooters(Configuration configuration, Collection<FileStatus> statuses) throws IOException {
-    if (Log.DEBUG) LOG.debug("reading " + statuses.size() + " files");
-    boolean taskSideMetaData = isTaskSideMetaData(configuration);
-    return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses, taskSideMetaData);
-  }
-
-  /**
-   * @param jobContext the current job context
-   * @return the merged metadata from the footers
-   * @throws IOException
-   */
-  public GlobalMetaData getGlobalMetaData(JobContext jobContext) throws IOException {
-    return ParquetFileWriter.getGlobalMetaData(getFooters(jobContext));
-  }
-
-  /**
-   * A simple wrapper around {@link parquet.hadoop.Footer} that also includes a
-   * modification time associated with that footer.  The modification time is
-   * used to determine whether the footer is still current.
-   */
-  static final class FootersCacheValue
-          implements LruCache.Value<FileStatusWrapper, FootersCacheValue> {
-    private final long modificationTime;
-    private final Footer footer;
-
-    public FootersCacheValue(FileStatusWrapper status, Footer footer) {
-      this.modificationTime = status.getModificationTime();
-      this.footer = new Footer(footer.getFile(), footer.getParquetMetadata());
-    }
-
-    @Override
-    public boolean isCurrent(FileStatusWrapper key) {
-      long currentModTime = key.getModificationTime();
-      boolean isCurrent = modificationTime >= currentModTime;
-      if (Log.DEBUG && !isCurrent) {
-        LOG.debug("The cache value for '" + key + "' is not current: "
-                + "cached modification time=" + modificationTime + ", "
-                + "current modification time: " + currentModTime);
-      }
-      return isCurrent;
-    }
-
-    public Footer getFooter() {
-      return footer;
-    }
-
-    @Override
-    public boolean isNewerThan(FootersCacheValue otherValue) {
-      return otherValue == null ||
-              modificationTime > otherValue.modificationTime;
-    }
-
-    public Path getPath() {
-      return footer.getFile();
-    }
-  }
-
-  /**
-   * A simple wrapper around {@link org.apache.hadoop.fs.FileStatus} with a
-   * meaningful "toString()" method
-   */
-  static final class FileStatusWrapper {
-    private final FileStatus status;
-    public FileStatusWrapper(FileStatus fileStatus) {
-      if (fileStatus == null) {
-        throw new IllegalArgumentException("FileStatus object cannot be null");
-      }
-      status = fileStatus;
-    }
-
-    public long getModificationTime() {
-      return status.getModificationTime();
-    }
-
-    @Override
-    public int hashCode() {
-      return status.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      return other instanceof FileStatusWrapper &&
-              status.equals(((FileStatusWrapper) other).status);
-    }
-
-    @Override
-    public String toString() {
-      return status.getPath().toString();
-    }
-  }
-
-}
-
-class ClientSideMetadataSplitStrategy {
-  //Wrapper of hdfs blocks, keep track of which HDFS block is being used
-  private static class HDFSBlocks {
-    BlockLocation[] hdfsBlocks;
-    int currentStartHdfsBlockIndex = 0;//the hdfs block index corresponding to the start of a row group
-    int currentMidPointHDFSBlockIndex = 0;// the hdfs block index corresponding to the mid-point of a row group, a split might be created only when the midpoint of the rowgroup enters a new hdfs block
-
-    private HDFSBlocks(BlockLocation[] hdfsBlocks) {
-      this.hdfsBlocks = hdfsBlocks;
-      Comparator<BlockLocation> comparator = new Comparator<BlockLocation>() {
-        @Override
-        public int compare(BlockLocation b1, BlockLocation b2) {
-          return Long.signum(b1.getOffset() - b2.getOffset());
-        }
-      };
-      Arrays.sort(hdfsBlocks, comparator);
-    }
-
-    private long getHDFSBlockEndingPosition(int hdfsBlockIndex) {
-      BlockLocation hdfsBlock = hdfsBlocks[hdfsBlockIndex];
-      return hdfsBlock.getOffset() + hdfsBlock.getLength() - 1;
-    }
-
-    /**
-     * @param rowGroupMetadata
-     * @return true if the mid point of row group is in a new hdfs block, and also move the currentHDFSBlock pointer to the correct index that contains the row group;
-     * return false if the mid point of row group is in the same hdfs block
-     */
-    private boolean checkBelongingToANewHDFSBlock(BlockMetaData rowGroupMetadata) {
-      boolean isNewHdfsBlock = false;
-      long rowGroupMidPoint = rowGroupMetadata.getStartingPos() + (rowGroupMetadata.getCompressedSize() / 2);
-
-      //if mid point is not in the current HDFS block any more, return true
-      while (rowGroupMidPoint > getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex)) {
-        isNewHdfsBlock = true;
-        currentMidPointHDFSBlockIndex++;
-        if (currentMidPointHDFSBlockIndex >= hdfsBlocks.length)
-          throw new ParquetDecodingException("the row group is not in hdfs blocks in the file: midpoint of row groups is "
-                  + rowGroupMidPoint
-                  + ", the end of the hdfs block is "
-                  + getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex - 1));
-      }
-
-      while (rowGroupMetadata.getStartingPos() > getHDFSBlockEndingPosition(currentStartHdfsBlockIndex)) {
-        currentStartHdfsBlockIndex++;
-        if (currentStartHdfsBlockIndex >= hdfsBlocks.length)
-          throw new ParquetDecodingException("The row group does not start in this file: row group offset is "
-                  + rowGroupMetadata.getStartingPos()
-                  + " but the end of hdfs blocks of file is "
-                  + getHDFSBlockEndingPosition(currentStartHdfsBlockIndex));
-      }
-      return isNewHdfsBlock;
-    }
-
-    public BlockLocation getCurrentBlock() {
-      return hdfsBlocks[currentStartHdfsBlockIndex];
-    }
-  }
-
-  static class SplitInfo {
-    List<BlockMetaData> rowGroups = new ArrayList<BlockMetaData>();
-    BlockLocation hdfsBlock;
-    long compressedByteSize = 0L;
-
-    public SplitInfo(BlockLocation currentBlock) {
-      this.hdfsBlock = currentBlock;
-    }
-
-    private void addRowGroup(BlockMetaData rowGroup) {
-      this.rowGroups.add(rowGroup);
-      this.compressedByteSize += rowGroup.getCompressedSize();
-    }
-
-    public long getCompressedByteSize() {
-      return compressedByteSize;
-    }
-
-    public List<BlockMetaData> getRowGroups() {
-      return rowGroups;
-    }
-
-    int getRowGroupCount() {
-      return rowGroups.size();
-    }
-
-    public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, String requestedSchema, Map<String, String> readSupportMetadata) throws IOException {
-      MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
-      long length = 0;
-
-      for (BlockMetaData block : this.getRowGroups()) {
-        List<ColumnChunkMetaData> columns = block.getColumns();
-        for (ColumnChunkMetaData column : columns) {
-          if (requested.containsPath(column.getPath().toArray())) {
-            length += column.getTotalSize();
-          }
-        }
-      }
-
-      BlockMetaData lastRowGroup = this.getRowGroups().get(this.getRowGroupCount() - 1);
-      long end = lastRowGroup.getStartingPos() + lastRowGroup.getTotalByteSize();
-
-      long[] rowGroupOffsets = new long[this.getRowGroupCount()];
-      for (int i = 0; i < rowGroupOffsets.length; i++) {
-        rowGroupOffsets[i] = this.getRowGroups().get(i).getStartingPos();
-      }
-
-      return new ParquetInputSplit(
-              fileStatus.getPath(),
-              hdfsBlock.getOffset(),
-              end,
-              length,
-              hdfsBlock.getHosts(),
-              rowGroupOffsets
-      );
-    }
-  }
-
-  private static final Log LOG = Log.getLog(ClientSideMetadataSplitStrategy.class);
-
-  List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
-      long maxSplitSize, long minSplitSize, ReadContext readContext)
-      throws IOException {
-    List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
-    Filter filter = ParquetInputFormat.getFilter(configuration);
-
-    long rowGroupsDropped = 0;
-    long totalRowGroups = 0;
-
-    for (Footer footer : footers) {
-      final Path file = footer.getFile();
-      LOG.debug(file);
-      FileSystem fs = file.getFileSystem(configuration);
-      FileStatus fileStatus = fs.getFileStatus(file);
-      ParquetMetadata parquetMetaData = footer.getParquetMetadata();
-      List<BlockMetaData> blocks = parquetMetaData.getBlocks();
-
-      List<BlockMetaData> filteredBlocks;
-
-      totalRowGroups += blocks.size();
-      filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
-      rowGroupsDropped += blocks.size() - filteredBlocks.size();
-
-      if (filteredBlocks.isEmpty()) {
-        continue;
-      }
-
-      BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-      splits.addAll(
-          generateSplits(
-              filteredBlocks,
-              fileBlockLocations,
-              fileStatus,
-              readContext.getRequestedSchema().toString(),
-              readContext.getReadSupportMetadata(),
-              minSplitSize,
-              maxSplitSize)
-          );
-    }
-
-    if (rowGroupsDropped > 0 && totalRowGroups > 0) {
-      int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
-      LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
-    } else {
-      LOG.info("There were no row groups that could be dropped due to filter predicates");
-    }
-    return splits;
-  }
-
-  /**
-   * groups together all the data blocks for the same HDFS block
-   *
-   * @param rowGroupBlocks      data blocks (row groups)
-   * @param hdfsBlocksArray     hdfs blocks
-   * @param fileStatus          the containing file
-   * @param requestedSchema     the schema requested by the user
-   * @param readSupportMetadata the metadata provided by the readSupport implementation in init
-   * @param minSplitSize        the mapred.min.split.size
-   * @param maxSplitSize        the mapred.max.split.size
-   * @return the splits (one per HDFS block)
-   * @throws IOException If hosts can't be retrieved for the HDFS block
-   */
-  static <T> List<ParquetInputSplit> generateSplits(
-          List<BlockMetaData> rowGroupBlocks,
-          BlockLocation[] hdfsBlocksArray,
-          FileStatus fileStatus,
-          String requestedSchema,
-          Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
-
-    List<SplitInfo> splitRowGroups =
-        generateSplitInfo(rowGroupBlocks, hdfsBlocksArray, minSplitSize, maxSplitSize);
-
-    //generate splits from rowGroups of each split
-    List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
-    for (SplitInfo splitInfo : splitRowGroups) {
-      ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, requestedSchema, readSupportMetadata);
-      resultSplits.add(split);
-    }
-    return resultSplits;
-  }
-
-  static List<SplitInfo> generateSplitInfo(
-      List<BlockMetaData> rowGroupBlocks,
-      BlockLocation[] hdfsBlocksArray,
-      long minSplitSize, long maxSplitSize) {
-    List<SplitInfo> splitRowGroups;
-
-    if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
-      throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
-    }
-    HDFSBlocks hdfsBlocks = new HDFSBlocks(hdfsBlocksArray);
-    hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupBlocks.get(0));
-    SplitInfo currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
-
-    //assign rowGroups to splits
-    splitRowGroups = new ArrayList<SplitInfo>();
-    checkSorted(rowGroupBlocks);//assert row groups are sorted
-    for (BlockMetaData rowGroupMetadata : rowGroupBlocks) {
-      if ((hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupMetadata)
-             && currentSplit.getCompressedByteSize() >= minSplitSize
-             && currentSplit.getCompressedByteSize() > 0)
-           || currentSplit.getCompressedByteSize() >= maxSplitSize) {
-        //create a new split
-        splitRowGroups.add(currentSplit);//finish previous split
-        currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
-      }
-      currentSplit.addRowGroup(rowGroupMetadata);
-    }
-
-    if (currentSplit.getRowGroupCount() > 0) {
-      splitRowGroups.add(currentSplit);
-    }
-
-    return splitRowGroups;
-  }
-
-  private static void checkSorted(List<BlockMetaData> rowGroupBlocks) {
-    long previousOffset = 0L;
-    for(BlockMetaData rowGroup: rowGroupBlocks) {
-      long currentOffset = rowGroup.getStartingPos();
-      if (currentOffset < previousOffset) {
-        throw new ParquetDecodingException("row groups are not sorted: previous row groups starts at " + previousOffset + ", current row group starts at " + currentOffset);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
deleted file mode 100644
index 9bb422c..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
-/**
- * An input split for the Parquet format
- * It contains the information to read one block of the file.
- *
- * This class is private to the ParquetInputFormat.
- * Backward compatibility is not maintained.
- *
- * @author Julien Le Dem
- */
-@Private
-public class ParquetInputSplit extends FileSplit implements Writable {
-
-
-  private long end;
-  private long[] rowGroupOffsets;
-
-  /**
-   * Writables must have a parameterless constructor
-   */
-  public ParquetInputSplit() {
-        super(null, 0, 0, new String[0]);
-  }
-
-  /**
-   * For compatibility only
-   * use {@link ParquetInputSplit#ParquetInputSplit(Path, long, long, long, String[], long[])}
-   * @param path
-   * @param start
-   * @param length
-   * @param hosts
-   * @param blocks
-   * @param requestedSchema
-   * @param fileSchema
-   * @param extraMetadata
-   * @param readSupportMetadata
-   */
-  @Deprecated
-  public ParquetInputSplit(
-      Path path,
-      long start,
-      long length,
-      String[] hosts,
-      List<BlockMetaData> blocks,
-      String requestedSchema,
-      String fileSchema,
-      Map<String, String> extraMetadata,
-      Map<String, String> readSupportMetadata) {
-    this(path, start, length, end(blocks, requestedSchema), hosts, offsets(blocks));
-  }
-
-  private static long end(List<BlockMetaData> blocks, String requestedSchema) {
-    MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
-    long length = 0;
-
-    for (BlockMetaData block : blocks) {
-      List<ColumnChunkMetaData> columns = block.getColumns();
-      for (ColumnChunkMetaData column : columns) {
-        if (requested.containsPath(column.getPath().toArray())) {
-          length += column.getTotalSize();
-        }
-      }
-    }
-    return length;
-  }
-
-  private static long[] offsets(List<BlockMetaData> blocks) {
-    long[] offsets = new long[blocks.size()];
-    for (int i = 0; i < offsets.length; i++) {
-      offsets[i] = blocks.get(i).getStartingPos();
-    }
-    return offsets;
-  }
-
-  /**
-   * @return the block meta data
-   * @deprecated the file footer is no longer read before creating input splits
-   */
-  @Deprecated
-  public List<BlockMetaData> getBlocks() {
-    throw new UnsupportedOperationException(
-        "Splits no longer have row group metadata, see PARQUET-234");
-  }
-
-  /**
-   * Builds a {@code ParquetInputSplit} from a mapreduce {@link FileSplit}.
-   *
-   * @param split a mapreduce FileSplit
-   * @return a ParquetInputSplit
-   * @throws IOException
-   */
-  static ParquetInputSplit from(FileSplit split) throws IOException {
-    return new ParquetInputSplit(split.getPath(),
-        split.getStart(), split.getStart() + split.getLength(),
-        split.getLength(), split.getLocations(), null);
-  }
-
-  /**
-   * Builds a {@code ParquetInputSplit} from a mapred
-   * {@link org.apache.hadoop.mapred.FileSplit}.
-   *
-   * @param split a mapreduce FileSplit
-   * @return a ParquetInputSplit
-   * @throws IOException
-   */
-  static ParquetInputSplit from(org.apache.hadoop.mapred.FileSplit split) throws IOException {
-    return new ParquetInputSplit(split.getPath(),
-        split.getStart(), split.getStart() + split.getLength(),
-        split.getLength(), split.getLocations(), null);
-  }
-
-  /**
-   * @param file the path of the file for that split
-   * @param start the start offset in the file
-   * @param end the end offset in the file
-   * @param length the actual size in bytes that we expect to read
-   * @param hosts the hosts with the replicas of this data
-   * @param rowGroupOffsets the offsets of the rowgroups selected if loaded on the client
-   */
-  public ParquetInputSplit(
-      Path file, long start, long end, long length, String[] hosts,
-      long[] rowGroupOffsets) {
-    super(file, start, length, hosts);
-    this.end = end;
-    this.rowGroupOffsets = rowGroupOffsets;
-  }
-
-  /**
-   * @return the requested schema
-   * @deprecated the file footer is no longer read before creating input splits
-   */
-  @Deprecated
-  String getRequestedSchema() {
-    throw new UnsupportedOperationException(
-        "Splits no longer have the requested schema, see PARQUET-234");
-  }
-
-  /**
-   * @return the file schema
-   * @deprecated the file footer is no longer read before creating input splits
-   */
-  @Deprecated
-  public String getFileSchema() {
-    throw new UnsupportedOperationException(
-        "Splits no longer have the file schema, see PARQUET-234");
-  }
-
-  /**
-   * @return the end offset of that split
-   */
-  public long getEnd() {
-    return end;
-  }
-
-  /**
-   * @return app specific metadata from the file
-   * @deprecated the file footer is no longer read before creating input splits
-   */
-  @Deprecated
-  public Map<String, String> getExtraMetadata() {
-    throw new UnsupportedOperationException(
-        "Splits no longer have file metadata, see PARQUET-234");
-  }
-
-  /**
-   * @return app specific metadata provided by the read support in the init phase
-   */
-  @Deprecated
-  Map<String, String> getReadSupportMetadata() {
-    throw new UnsupportedOperationException(
-        "Splits no longer have read-support metadata, see PARQUET-234");
-  }
-
-  /**
-   * @return the offsets of the row group selected if this has been determined on the client side
-   */
-  public long[] getRowGroupOffsets() {
-    return rowGroupOffsets;
-  }
-
-  @Override
-  public String toString() {
-    String hosts;
-    try{
-       hosts = Arrays.toString(getLocations());
-    } catch (Exception e) {
-      // IOException/InterruptedException could be thrown
-      hosts = "(" + e + ")";
-    }
-
-    return this.getClass().getSimpleName() + "{" +
-           "part: " + getPath()
-        + " start: " + getStart()
-        + " end: " + getEnd()
-        + " length: " + getLength()
-        + " hosts: " + hosts
-        + (rowGroupOffsets == null ? "" : (" row groups: " + Arrays.toString(rowGroupOffsets)))
-        + "}";
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void readFields(DataInput hin) throws IOException {
-    byte[] bytes = readArray(hin);
-    DataInputStream in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)));
-    super.readFields(in);
-    this.end = in.readLong();
-    if (in.readBoolean()) {
-      this.rowGroupOffsets = new long[in.readInt()];
-      for (int i = 0; i < rowGroupOffsets.length; i++) {
-        rowGroupOffsets[i] = in.readLong();
-      }
-    }
-    in.close();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void write(DataOutput hout) throws IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(new GZIPOutputStream(baos));
-    super.write(out);
-    out.writeLong(end);
-    out.writeBoolean(rowGroupOffsets != null);
-    if (rowGroupOffsets != null) {
-      out.writeInt(rowGroupOffsets.length);
-      for (long o : rowGroupOffsets) {
-        out.writeLong(o);
-      }
-    }
-    out.close();
-    writeArray(hout, baos.toByteArray());
-  }
-
-  private static void writeArray(DataOutput out, byte[] bytes) throws IOException {
-    out.writeInt(bytes.length);
-    out.write(bytes, 0, bytes.length);
-  }
-
-  private static byte[] readArray(DataInput in) throws IOException {
-    int len = in.readInt();
-    byte[] bytes = new byte[len];
-    in.readFully(bytes);
-    return bytes;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
deleted file mode 100644
index 0e0ce42..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-
-import parquet.Log;
-import parquet.hadoop.util.ContextUtil;
-
-public class ParquetOutputCommitter extends FileOutputCommitter {
-  private static final Log LOG = Log.getLog(ParquetOutputCommitter.class);
-
-  private final Path outputPath;
-
-  public ParquetOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
-    super(outputPath, context);
-    this.outputPath = outputPath;
-  }
-
-  public void commitJob(JobContext jobContext) throws IOException {
-    super.commitJob(jobContext);
-    Configuration configuration = ContextUtil.getConfiguration(jobContext);
-    writeMetaDataFile(configuration,outputPath);
-  }
-
-  public static void writeMetaDataFile(Configuration configuration, Path outputPath) {
-    if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
-      try {
-        final FileSystem fileSystem = outputPath.getFileSystem(configuration);
-        FileStatus outputStatus = fileSystem.getFileStatus(outputPath);
-        List<Footer> footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus);
-        try {
-          ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers);
-        } catch (Exception e) {
-          LOG.warn("could not write summary file for " + outputPath, e);
-          final Path metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE);
-          if (fileSystem.exists(metadataPath)) {
-            fileSystem.delete(metadataPath, true);
-          }
-        }
-      } catch (Exception e) {
-        LOG.warn("could not write summary file for " + outputPath, e);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
deleted file mode 100644
index 43648f4..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static parquet.Log.INFO;
-import static parquet.Preconditions.checkNotNull;
-import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
-import static parquet.hadoop.util.ContextUtil.getConfiguration;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-import parquet.Log;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.hadoop.api.WriteSupport;
-import parquet.hadoop.api.WriteSupport.WriteContext;
-import parquet.hadoop.codec.CodecConfig;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.util.ConfigurationUtil;
-
-/**
- * OutputFormat to write to a Parquet file
- *
- * It requires a {@link WriteSupport} to convert the actual records to the underlying format.
- * It requires the schema of the incoming records. (provided by the write support)
- * It allows storing extra metadata in the footer (for example: for schema compatibility purpose when converting from a different schema language).
- *
- * The format configuration settings in the job configuration:
- * <pre>
- * # The block size is the size of a row group being buffered in memory
- * # this limits the memory usage when writing
- * # Larger values will improve the IO when reading but consume more memory when writing
- * parquet.block.size=134217728 # in bytes, default = 128 * 1024 * 1024
- *
- * # The page size is for compression. When reading, each page can be decompressed independently.
- * # A block is composed of pages. The page is the smallest unit that must be read fully to access a single record.
- * # If this value is too small, the compression will deteriorate
- * parquet.page.size=1048576 # in bytes, default = 1 * 1024 * 1024
- *
- * # There is one dictionary page per column per row group when dictionary encoding is used.
- * # The dictionary page size works like the page size but for dictionary
- * parquet.dictionary.page.size=1048576 # in bytes, default = 1 * 1024 * 1024
- *
- * # The compression algorithm used to compress pages
- * parquet.compression=UNCOMPRESSED # one of: UNCOMPRESSED, SNAPPY, GZIP, LZO. Default: UNCOMPRESSED. Supersedes mapred.output.compress*
- *
- * # The write support class to convert the records written to the OutputFormat into the events accepted by the record consumer
- * # Usually provided by a specific ParquetOutputFormat subclass
- * parquet.write.support.class= # fully qualified name
- *
- * # To enable/disable dictionary encoding
- * parquet.enable.dictionary=true # false to disable dictionary encoding
- *
- * # To enable/disable summary metadata aggregation at the end of a MR job
- * # The default is true (enabled)
- * parquet.enable.summary-metadata=true # false to disable summary aggregation
- * </pre>
- *
- * If parquet.compression is not set, the following properties are checked (FileOutputFormat behavior).
- * Note that we explicitely disallow custom Codecs
- * <pre>
- * mapred.output.compress=true
- * mapred.output.compression.codec=org.apache.hadoop.io.compress.SomeCodec # the codec must be one of Snappy, GZip or LZO
- * </pre>
- *
- * if none of those is set the data is uncompressed.
- *
- * @author Julien Le Dem
- *
- * @param <T> the type of the materialized records
- */
-public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
-  private static final Log LOG = Log.getLog(ParquetOutputFormat.class);
-
-  public static final String BLOCK_SIZE           = "parquet.block.size";
-  public static final String PAGE_SIZE            = "parquet.page.size";
-  public static final String COMPRESSION          = "parquet.compression";
-  public static final String WRITE_SUPPORT_CLASS  = "parquet.write.support.class";
-  public static final String DICTIONARY_PAGE_SIZE = "parquet.dictionary.page.size";
-  public static final String ENABLE_DICTIONARY    = "parquet.enable.dictionary";
-  public static final String VALIDATION           = "parquet.validation";
-  public static final String WRITER_VERSION       = "parquet.writer.version";
-  public static final String ENABLE_JOB_SUMMARY   = "parquet.enable.summary-metadata";
-  public static final String MEMORY_POOL_RATIO    = "parquet.memory.pool.ratio";
-  public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size";
-
-  public static void setWriteSupportClass(Job job,  Class<?> writeSupportClass) {
-    getConfiguration(job).set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
-  }
-
-  public static void setWriteSupportClass(JobConf job, Class<?> writeSupportClass) {
-      job.set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
-  }
-
-  public static Class<?> getWriteSupportClass(Configuration configuration) {
-    final String className = configuration.get(WRITE_SUPPORT_CLASS);
-    if (className == null) {
-      return null;
-    }
-    final Class<?> writeSupportClass = ConfigurationUtil.getClassFromConfig(configuration, WRITE_SUPPORT_CLASS, WriteSupport.class);
-    return writeSupportClass;
-  }
-
-  public static void setBlockSize(Job job, int blockSize) {
-    getConfiguration(job).setInt(BLOCK_SIZE, blockSize);
-  }
-
-  public static void setPageSize(Job job, int pageSize) {
-    getConfiguration(job).setInt(PAGE_SIZE, pageSize);
-  }
-
-  public static void setDictionaryPageSize(Job job, int pageSize) {
-    getConfiguration(job).setInt(DICTIONARY_PAGE_SIZE, pageSize);
-  }
-
-  public static void setCompression(Job job, CompressionCodecName compression) {
-    getConfiguration(job).set(COMPRESSION, compression.name());
-  }
-
-  public static void setEnableDictionary(Job job, boolean enableDictionary) {
-    getConfiguration(job).setBoolean(ENABLE_DICTIONARY, enableDictionary);
-  }
-
-  public static boolean getEnableDictionary(JobContext jobContext) {
-    return getEnableDictionary(getConfiguration(jobContext));
-  }
-
-  public static int getBlockSize(JobContext jobContext) {
-    return getBlockSize(getConfiguration(jobContext));
-  }
-
-  public static int getPageSize(JobContext jobContext) {
-    return getPageSize(getConfiguration(jobContext));
-  }
-
-  public static int getDictionaryPageSize(JobContext jobContext) {
-    return getDictionaryPageSize(getConfiguration(jobContext));
-  }
-
-  public static CompressionCodecName getCompression(JobContext jobContext) {
-    return getCompression(getConfiguration(jobContext));
-  }
-
-  public static boolean isCompressionSet(JobContext jobContext) {
-    return isCompressionSet(getConfiguration(jobContext));
-  }
-
-  public static void setValidation(JobContext jobContext, boolean validating) {
-    setValidation(getConfiguration(jobContext), validating);
-  }
-
-  public static boolean getValidation(JobContext jobContext) {
-    return getValidation(getConfiguration(jobContext));
-  }
-
-  public static boolean getEnableDictionary(Configuration configuration) {
-    return configuration.getBoolean(ENABLE_DICTIONARY, true);
-  }
-
-  @Deprecated
-  public static int getBlockSize(Configuration configuration) {
-    return configuration.getInt(BLOCK_SIZE, DEFAULT_BLOCK_SIZE);
-  }
-
-  public static long getLongBlockSize(Configuration configuration) {
-    return configuration.getLong(BLOCK_SIZE, DEFAULT_BLOCK_SIZE);
-  }
-
-  public static int getPageSize(Configuration configuration) {
-    return configuration.getInt(PAGE_SIZE, DEFAULT_PAGE_SIZE);
-  }
-
-  public static int getDictionaryPageSize(Configuration configuration) {
-    return configuration.getInt(DICTIONARY_PAGE_SIZE, DEFAULT_PAGE_SIZE);
-  }
-
-  public static WriterVersion getWriterVersion(Configuration configuration) {
-    String writerVersion = configuration.get(WRITER_VERSION, WriterVersion.PARQUET_1_0.toString());
-    return WriterVersion.fromString(writerVersion);
-  }
-
-  public static CompressionCodecName getCompression(Configuration configuration) {
-    return CodecConfig.getParquetCompressionCodec(configuration);
-  }
-
-  public static boolean isCompressionSet(Configuration configuration) {
-    return CodecConfig.isParquetCompressionSet(configuration);
-  }
-
-  public static void setValidation(Configuration configuration, boolean validating) {
-    configuration.setBoolean(VALIDATION, validating);
-  }
-
-  public static boolean getValidation(Configuration configuration) {
-    return configuration.getBoolean(VALIDATION, false);
-  }
-
-  private CompressionCodecName getCodec(TaskAttemptContext taskAttemptContext) {
-    return CodecConfig.from(taskAttemptContext).getCodec();
-  }
-
-
-
-  private WriteSupport<T> writeSupport;
-  private ParquetOutputCommitter committer;
-
-  /**
-   * constructor used when this OutputFormat in wrapped in another one (In Pig for example)
-   * @param writeSupportClass the class used to convert the incoming records
-   * @param schema the schema of the records
-   * @param extraMetaData extra meta data to be stored in the footer of the file
-   */
-  public <S extends WriteSupport<T>> ParquetOutputFormat(S writeSupport) {
-    this.writeSupport = writeSupport;
-  }
-
-  /**
-   * used when directly using the output format and configuring the write support implementation
-   * using parquet.write.support.class
-   */
-  public <S extends WriteSupport<T>> ParquetOutputFormat() {
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public RecordWriter<Void, T> getRecordWriter(TaskAttemptContext taskAttemptContext)
-      throws IOException, InterruptedException {
-
-    final Configuration conf = getConfiguration(taskAttemptContext);
-
-    CompressionCodecName codec = getCodec(taskAttemptContext);
-    String extension = codec.getExtension() + ".parquet";
-    Path file = getDefaultWorkFile(taskAttemptContext, extension);
-    return getRecordWriter(conf, file, codec);
-  }
-
-  public RecordWriter<Void, T> getRecordWriter(TaskAttemptContext taskAttemptContext, Path file)
-      throws IOException, InterruptedException {
-    return getRecordWriter(getConfiguration(taskAttemptContext), file, getCodec(taskAttemptContext));
-  }
-
-  public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec)
-        throws IOException, InterruptedException {
-    final WriteSupport<T> writeSupport = getWriteSupport(conf);
-
-    CodecFactory codecFactory = new CodecFactory(conf);
-    long blockSize = getLongBlockSize(conf);
-    if (INFO) LOG.info("Parquet block size to " + blockSize);
-    int pageSize = getPageSize(conf);
-    if (INFO) LOG.info("Parquet page size to " + pageSize);
-    int dictionaryPageSize = getDictionaryPageSize(conf);
-    if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize);
-    boolean enableDictionary = getEnableDictionary(conf);
-    if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off"));
-    boolean validating = getValidation(conf);
-    if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));
-    WriterVersion writerVersion = getWriterVersion(conf);
-    if (INFO) LOG.info("Writer version is: " + writerVersion);
-
-    WriteContext init = writeSupport.init(conf);
-    ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);
-    w.start();
-
-    float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
-        MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
-    long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,
-        MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);
-    if (memoryManager == null) {
-      memoryManager = new MemoryManager(maxLoad, minAllocation);
-    } else if (memoryManager.getMemoryPoolRatio() != maxLoad) {
-      LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +
-          "be reset by the new value: " + maxLoad);
-    }
-
-    return new ParquetRecordWriter<T>(
-        w,
-        writeSupport,
-        init.getSchema(),
-        init.getExtraMetaData(),
-        blockSize, pageSize,
-        codecFactory.getCompressor(codec, pageSize),
-        dictionaryPageSize,
-        enableDictionary,
-        validating,
-        writerVersion,
-        memoryManager);
-  }
-
-  /**
-   * @param configuration to find the configuration for the write support class
-   * @return the configured write support
-   */
-  @SuppressWarnings("unchecked")
-  public WriteSupport<T> getWriteSupport(Configuration configuration){
-    if (writeSupport != null) return writeSupport;
-    Class<?> writeSupportClass = getWriteSupportClass(configuration);
-    try {
-      return (WriteSupport<T>)checkNotNull(writeSupportClass, "writeSupportClass").newInstance();
-    } catch (InstantiationException e) {
-      throw new BadConfigurationException("could not instantiate write support class: " + writeSupportClass, e);
-    } catch (IllegalAccessException e) {
-      throw new BadConfigurationException("could not instantiate write support class: " + writeSupportClass, e);
-    }
-  }
-
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-      throws IOException {
-    if (committer == null) {
-      Path output = getOutputPath(context);
-      committer = new ParquetOutputCommitter(output, context);
-    }
-    return committer;
-  }
-
-
-  /**
-   * This memory manager is for all the real writers (InternalParquetRecordWriter) in one task.
-   */
-  private static MemoryManager memoryManager;
-
-  static MemoryManager getMemoryManager() {
-    return memoryManager;
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
deleted file mode 100644
index 7c3ecbb..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static parquet.Preconditions.checkNotNull;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.filter.UnboundRecordFilter;
-import parquet.filter2.compat.FilterCompat;
-import parquet.filter2.compat.FilterCompat.Filter;
-import parquet.filter2.compat.RowGroupFilter;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.util.HiddenFileFilter;
-import parquet.schema.MessageType;
-
-/**
- * Read records from a Parquet file.
- * TODO: too many constructors (https://issues.apache.org/jira/browse/PARQUET-39)
- */
-public class ParquetReader<T> implements Closeable {
-
-  private final ReadSupport<T> readSupport;
-  private final Configuration conf;
-  private final Iterator<Footer> footersIterator;
-  private final Filter filter;
-
-  private InternalParquetRecordReader<T> reader;
-
-  /**
-   * @param file the file to read
-   * @param readSupport to materialize records
-   * @throws IOException
-   * @deprecated use {@link #builder(ReadSupport, Path)}
-   */
-  @Deprecated
-  public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
-    this(new Configuration(), file, readSupport, FilterCompat.NOOP);
-  }
-
-  /**
-   * @param conf the configuration
-   * @param file the file to read
-   * @param readSupport to materialize records
-   * @throws IOException
-   * @deprecated use {@link #builder(ReadSupport, Path)}
-   */
-  @Deprecated
-  public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
-    this(conf, file, readSupport, FilterCompat.NOOP);
-  }
-
-  /**
-   * @param file the file to read
-   * @param readSupport to materialize records
-   * @param unboundRecordFilter the filter to use to filter records
-   * @throws IOException
-   * @deprecated use {@link #builder(ReadSupport, Path)}
-   */
-  @Deprecated
-  public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
-    this(new Configuration(), file, readSupport, FilterCompat.get(unboundRecordFilter));
-  }
-
-  /**
-   * @param conf the configuration
-   * @param file the file to read
-   * @param readSupport to materialize records
-   * @param unboundRecordFilter the filter to use to filter records
-   * @throws IOException
-   * @deprecated use {@link #builder(ReadSupport, Path)}
-   */
-  @Deprecated
-  public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
-    this(conf, file, readSupport, FilterCompat.get(unboundRecordFilter));
-  }
-
-  private ParquetReader(Configuration conf,
-                       Path file,
-                       ReadSupport<T> readSupport,
-                       Filter filter) throws IOException {
-    this.readSupport = readSupport;
-    this.filter = checkNotNull(filter, "filter");
-    this.conf = conf;
-
-    FileSystem fs = file.getFileSystem(conf);
-    List<FileStatus> statuses = Arrays.asList(fs.listStatus(file, HiddenFileFilter.INSTANCE));
-    List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses, false);
-    this.footersIterator = footers.iterator();
-  }
-
-  /**
-   * @return the next record or null if finished
-   * @throws IOException
-   */
-  public T read() throws IOException {
-    try {
-      if (reader != null && reader.nextKeyValue()) {
-        return reader.getCurrentValue();
-      } else {
-        initReader();
-        return reader == null ? null : read();
-      }
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-
-  private void initReader() throws IOException {
-    if (reader != null) {
-      reader.close();
-      reader = null;
-    }
-    if (footersIterator.hasNext()) {
-      Footer footer = footersIterator.next();
-
-      List<BlockMetaData> blocks = footer.getParquetMetadata().getBlocks();
-
-      MessageType fileSchema = footer.getParquetMetadata().getFileMetaData().getSchema();
-
-      List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(
-          filter, blocks, fileSchema);
-
-      reader = new InternalParquetRecordReader<T>(readSupport, filter);
-      reader.initialize(fileSchema,
-          footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
-          footer.getFile(), filteredBlocks, conf);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (reader != null) {
-      reader.close();
-    }
-  }
-
-  public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) {
-    return new Builder<T>(readSupport, path);
-  }
-
-  public static class Builder<T> {
-    private final ReadSupport<T> readSupport;
-    private final Path file;
-    private Configuration conf;
-    private Filter filter;
-
-    private Builder(ReadSupport<T> readSupport, Path path) {
-      this.readSupport = checkNotNull(readSupport, "readSupport");
-      this.file = checkNotNull(path, "path");
-      this.conf = new Configuration();
-      this.filter = FilterCompat.NOOP;
-    }
-
-    public Builder<T> withConf(Configuration conf) {
-      this.conf = checkNotNull(conf, "conf");
-      return this;
-    }
-
-    public Builder<T> withFilter(Filter filter) {
-      this.filter = checkNotNull(filter, "filter");
-      return this;
-    }
-
-    public ParquetReader<T> build() throws IOException {
-      return new ParquetReader<T>(conf, file, readSupport, filter);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
deleted file mode 100644
index abf65c1..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static parquet.filter2.compat.RowGroupFilter.filterRowGroups;
-import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-import static parquet.format.converter.ParquetMetadataConverter.range;
-import static parquet.hadoop.ParquetFileReader.readFooter;
-import static parquet.hadoop.ParquetInputFormat.getFilter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import parquet.Log;
-import parquet.filter.UnboundRecordFilter;
-import parquet.filter2.compat.FilterCompat;
-import parquet.filter2.compat.FilterCompat.Filter;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.hadoop.util.ContextUtil;
-import parquet.hadoop.util.counters.BenchmarkCounter;
-import parquet.schema.MessageType;
-
-/**
- * Reads the records from a block of a Parquet file
- *
- * @see ParquetInputFormat
- *
- * @author Julien Le Dem
- *
- * @param <T> type of the materialized records
- */
-public class ParquetRecordReader<T> extends RecordReader<Void, T> {
-
-  private static final Log LOG = Log.getLog(ParquetRecordReader.class);
-  private final InternalParquetRecordReader<T> internalReader;
-
-  /**
-   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
-   */
-  public ParquetRecordReader(ReadSupport<T> readSupport) {
-    this(readSupport, FilterCompat.NOOP);
-  }
-
-  /**
-   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
-   * @param filter for filtering individual records
-   */
-  public ParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
-    internalReader = new InternalParquetRecordReader<T>(readSupport, filter);
-  }
-
-  /**
-   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
-   * @param filter for filtering individual records
-   * @deprecated use {@link #ParquetRecordReader(ReadSupport, Filter)}
-   */
-  @Deprecated
-  public ParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
-    this(readSupport, FilterCompat.get(filter));
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void close() throws IOException {
-    internalReader.close();
-  }
-
-  /**
-   * always returns null
-   */
-  @Override
-  public Void getCurrentKey() throws IOException, InterruptedException {
-    return null;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public T getCurrentValue() throws IOException,
-  InterruptedException {
-    return internalReader.getCurrentValue();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public float getProgress() throws IOException, InterruptedException {
-    return internalReader.getProgress();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    if (context instanceof TaskInputOutputContext<?, ?, ?, ?>) {
-      BenchmarkCounter.initCounterFromContext((TaskInputOutputContext<?, ?, ?, ?>) context);
-    } else {
-      LOG.error("Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is "
-              + context.getClass().getCanonicalName());
-    }
-
-    initializeInternalReader(toParquetSplit(inputSplit), ContextUtil.getConfiguration(context));
-  }
-
-  public void initialize(InputSplit inputSplit, Configuration configuration, Reporter reporter)
-      throws IOException, InterruptedException {
-    BenchmarkCounter.initCounterFromReporter(reporter,configuration);
-    initializeInternalReader(toParquetSplit(inputSplit), configuration);
-  }
-
-  private void initializeInternalReader(ParquetInputSplit split, Configuration configuration) throws IOException {
-    Path path = split.getPath();
-    long[] rowGroupOffsets = split.getRowGroupOffsets();
-    List<BlockMetaData> filteredBlocks;
-    ParquetMetadata footer;
-    // if task.side.metadata is set, rowGroupOffsets is null
-    if (rowGroupOffsets == null) {
-      // then we need to apply the predicate push down filter
-      footer = readFooter(configuration, path, range(split.getStart(), split.getEnd()));
-      MessageType fileSchema = footer.getFileMetaData().getSchema();
-      Filter filter = getFilter(configuration);
-      filteredBlocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
-    } else {
-      // otherwise we find the row groups that were selected on the client
-      footer = readFooter(configuration, path, NO_FILTER);
-      Set<Long> offsets = new HashSet<Long>();
-      for (long offset : rowGroupOffsets) {
-        offsets.add(offset);
-      }
-      filteredBlocks = new ArrayList<BlockMetaData>();
-      for (BlockMetaData block : footer.getBlocks()) {
-        if (offsets.contains(block.getStartingPos())) {
-          filteredBlocks.add(block);
-        }
-      }
-      // verify we found them all
-      if (filteredBlocks.size() != rowGroupOffsets.length) {
-        long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
-        for (int i = 0; i < foundRowGroupOffsets.length; i++) {
-          foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
-        }
-        // this should never happen.
-        // provide a good error message in case there's a bug
-        throw new IllegalStateException(
-            "All the offsets listed in the split should be found in the file."
-            + " expected: " + Arrays.toString(rowGroupOffsets)
-            + " found: " + filteredBlocks
-            + " out of: " + Arrays.toString(foundRowGroupOffsets)
-            + " in range " + split.getStart() + ", " + split.getEnd());
-      }
-    }
-    MessageType fileSchema = footer.getFileMetaData().getSchema();
-    Map<String, String> fileMetaData = footer.getFileMetaData().getKeyValueMetaData();
-    internalReader.initialize(
-        fileSchema, fileMetaData, path, filteredBlocks, configuration);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    return internalReader.nextKeyValue();
-  }
-
-  private ParquetInputSplit toParquetSplit(InputSplit split) throws IOException {
-    if (split instanceof ParquetInputSplit) {
-      return (ParquetInputSplit) split;
-    } else if (split instanceof FileSplit) {
-      return ParquetInputSplit.from((FileSplit) split);
-    } else if (split instanceof org.apache.hadoop.mapred.FileSplit) {
-      return ParquetInputSplit.from(
-          (org.apache.hadoop.mapred.FileSplit) split);
-    } else {
-      throw new IllegalArgumentException(
-          "Invalid split (not a FileSplit or ParquetInputSplit): " + split);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java
deleted file mode 100644
index 968a334..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import java.io.IOException;
-import java.util.Map;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.hadoop.CodecFactory.BytesCompressor;
-import parquet.hadoop.api.WriteSupport;
-import parquet.schema.MessageType;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * Writes records to a Parquet file
- *
- * @see ParquetOutputFormat
- *
- * @author Julien Le Dem
- *
- * @param <T> the type of the materialized records
- */
-public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {
-
-  private InternalParquetRecordWriter<T> internalWriter;
-  private MemoryManager memoryManager;
-
-  /**
-   *
-   * @param w the file to write to
-   * @param writeSupport the class to convert incoming records
-   * @param schema the schema of the records
-   * @param extraMetaData extra meta data to write in the footer of the file
-   * @param blockSize the size of a block in the file (this will be approximate)
-   * @param compressor the compressor used to compress the pages
-   * @param dictionaryPageSize the threshold for dictionary size
-   * @param enableDictionary to enable the dictionary
-   * @param validating if schema validation should be turned on
-   */
-  @Deprecated
-  public ParquetRecordWriter(
-      ParquetFileWriter w,
-      WriteSupport<T> writeSupport,
-      MessageType schema,
-      Map<String, String> extraMetaData,
-      int blockSize, int pageSize,
-      BytesCompressor compressor,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating,
-      WriterVersion writerVersion) {
-    internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
-        extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
-        validating, writerVersion);
-  }
-
-  /**
-   *
-   * @param w the file to write to
-   * @param writeSupport the class to convert incoming records
-   * @param schema the schema of the records
-   * @param extraMetaData extra meta data to write in the footer of the file
-   * @param blockSize the size of a block in the file (this will be approximate)
-   * @param compressor the compressor used to compress the pages
-   * @param dictionaryPageSize the threshold for dictionary size
-   * @param enableDictionary to enable the dictionary
-   * @param validating if schema validation should be turned on
-   */
-  public ParquetRecordWriter(
-      ParquetFileWriter w,
-      WriteSupport<T> writeSupport,
-      MessageType schema,
-      Map<String, String> extraMetaData,
-      long blockSize, int pageSize,
-      BytesCompressor compressor,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating,
-      WriterVersion writerVersion,
-      MemoryManager memoryManager) {
-    internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
-        extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
-        validating, writerVersion);
-    this.memoryManager = checkNotNull(memoryManager, "memoryManager");
-    memoryManager.addWriter(internalWriter, blockSize);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-    internalWriter.close();
-    if (memoryManager != null) {
-      memoryManager.removeWriter(internalWriter);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void write(Void key, T value) throws IOException, InterruptedException {
-    internalWriter.write(value);
-  }
-
-}
\ No newline at end of file