You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:05 UTC
[08/51] [partial] parquet-mr git commit: PARQUET-23: Rename to
org.apache.parquet.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
deleted file mode 100644
index 6b7b9b0..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
+++ /dev/null
@@ -1,778 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static java.lang.Boolean.TRUE;
-import static parquet.Preconditions.checkArgument;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import parquet.Log;
-import parquet.Preconditions;
-import parquet.filter.UnboundRecordFilter;
-import parquet.filter2.compat.FilterCompat;
-import parquet.filter2.compat.FilterCompat.Filter;
-import parquet.filter2.compat.RowGroupFilter;
-import parquet.filter2.predicate.FilterPredicate;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.api.ReadSupport.ReadContext;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.hadoop.util.ConfigurationUtil;
-import parquet.hadoop.util.ContextUtil;
-import parquet.hadoop.util.HiddenFileFilter;
-import parquet.hadoop.util.SerializationUtil;
-import parquet.io.ParquetDecodingException;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
-/**
- * The input format to read a Parquet file.
- *
- * It requires an implementation of {@link ReadSupport} to materialize the records.
- *
- * The requestedSchema will control how the original records get projected by the loader.
- * It must be a subset of the original schema. Only the columns needed to reconstruct the records with the requestedSchema will be scanned.
- *
- * @see #READ_SUPPORT_CLASS
- * @see #UNBOUND_RECORD_FILTER
- * @see #STRICT_TYPE_CHECKING
- * @see #FILTER_PREDICATE
- * @see #TASK_SIDE_METADATA
- *
- * @author Julien Le Dem
- *
- * @param <T> the type of the materialized records
- */
-public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
-
- private static final Log LOG = Log.getLog(ParquetInputFormat.class);
-
- /**
- * key to configure the ReadSupport implementation
- */
- public static final String READ_SUPPORT_CLASS = "parquet.read.support.class";
-
- /**
- * key to configure the filter
- */
- public static final String UNBOUND_RECORD_FILTER = "parquet.read.filter";
-
- /**
- * key to configure type checking for conflicting schemas (default: true)
- */
- public static final String STRICT_TYPE_CHECKING = "parquet.strict.typing";
-
- /**
- * key to configure the filter predicate
- */
- public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";
-
- /**
- * key to turn on or off task side metadata loading (default true)
- * if true then metadata is read on the task side and some tasks may finish immediately.
- * if false metadata is read on the client which is slower if there is a lot of metadata but tasks will only be spawn if there is work to do.
- */
- public static final String TASK_SIDE_METADATA = "parquet.task.side.metadata";
-
- private static final int MIN_FOOTER_CACHE_SIZE = 100;
-
- public static void setTaskSideMetaData(Job job, boolean taskSideMetadata) {
- ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata);
- }
-
- public static boolean isTaskSideMetaData(Configuration configuration) {
- return configuration.getBoolean(TASK_SIDE_METADATA, TRUE);
- }
-
- public static void setReadSupportClass(Job job, Class<?> readSupportClass) {
- ContextUtil.getConfiguration(job).set(READ_SUPPORT_CLASS, readSupportClass.getName());
- }
-
- public static void setUnboundRecordFilter(Job job, Class<? extends UnboundRecordFilter> filterClass) {
- Configuration conf = ContextUtil.getConfiguration(job);
- checkArgument(getFilterPredicate(conf) == null,
- "You cannot provide an UnboundRecordFilter after providing a FilterPredicate");
-
- conf.set(UNBOUND_RECORD_FILTER, filterClass.getName());
- }
-
- /**
- * @deprecated use {@link #getFilter(Configuration)}
- */
- @Deprecated
- public static Class<?> getUnboundRecordFilter(Configuration configuration) {
- return ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
- }
-
- private static UnboundRecordFilter getUnboundRecordFilterInstance(Configuration configuration) {
- Class<?> clazz = ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
- if (clazz == null) { return null; }
-
- try {
- UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter) clazz.newInstance();
-
- if (unboundRecordFilter instanceof Configurable) {
- ((Configurable)unboundRecordFilter).setConf(configuration);
- }
-
- return unboundRecordFilter;
- } catch (InstantiationException e) {
- throw new BadConfigurationException("could not instantiate unbound record filter class", e);
- } catch (IllegalAccessException e) {
- throw new BadConfigurationException("could not instantiate unbound record filter class", e);
- }
- }
-
- public static void setReadSupportClass(JobConf conf, Class<?> readSupportClass) {
- conf.set(READ_SUPPORT_CLASS, readSupportClass.getName());
- }
-
- public static Class<?> getReadSupportClass(Configuration configuration) {
- return ConfigurationUtil.getClassFromConfig(configuration, READ_SUPPORT_CLASS, ReadSupport.class);
- }
-
- public static void setFilterPredicate(Configuration configuration, FilterPredicate filterPredicate) {
- checkArgument(getUnboundRecordFilter(configuration) == null,
- "You cannot provide a FilterPredicate after providing an UnboundRecordFilter");
-
- configuration.set(FILTER_PREDICATE + ".human.readable", filterPredicate.toString());
- try {
- SerializationUtil.writeObjectToConfAsBase64(FILTER_PREDICATE, filterPredicate, configuration);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static FilterPredicate getFilterPredicate(Configuration configuration) {
- try {
- return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE, configuration);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Returns a non-null Filter, which is a wrapper around either a
- * FilterPredicate, an UnboundRecordFilter, or a no-op filter.
- */
- public static Filter getFilter(Configuration conf) {
- return FilterCompat.get(getFilterPredicate(conf), getUnboundRecordFilterInstance(conf));
- }
-
- private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;
-
- private final Class<? extends ReadSupport<T>> readSupportClass;
-
- /**
- * Hadoop will instantiate using this constructor
- */
- public ParquetInputFormat() {
- this.readSupportClass = null;
- }
-
- /**
- * Constructor for subclasses, such as AvroParquetInputFormat, or wrappers.
- * <p>
- * Subclasses and wrappers may use this constructor to set the ReadSupport
- * class that will be used when reading instead of requiring the user to set
- * the read support property in their configuration.
- *
- * @param readSupportClass a ReadSupport subclass
- */
- public <S extends ReadSupport<T>> ParquetInputFormat(Class<S> readSupportClass) {
- this.readSupportClass = readSupportClass;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public RecordReader<Void, T> createRecordReader(
- InputSplit inputSplit,
- TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- Configuration conf = ContextUtil.getConfiguration(taskAttemptContext);
- ReadSupport<T> readSupport = getReadSupport(conf);
- return new ParquetRecordReader<T>(readSupport, getFilter(conf));
- }
-
- /**
- * @param configuration to find the configuration for the read support
- * @return the configured read support
- * @deprecated use getReadSupportInstance static methods instead
- */
- @Deprecated
- @SuppressWarnings("unchecked")
- ReadSupport<T> getReadSupport(Configuration configuration){
- return getReadSupportInstance(readSupportClass == null ?
- (Class<? extends ReadSupport<T>>) getReadSupportClass(configuration) :
- readSupportClass);
- }
-
- /**
- * @param configuration to find the configuration for the read support
- * @return the configured read support
- */
- @SuppressWarnings("unchecked")
- public static <T> ReadSupport<T> getReadSupportInstance(Configuration configuration){
- return getReadSupportInstance(
- (Class<? extends ReadSupport<T>>) getReadSupportClass(configuration));
- }
-
- /**
- * @param readSupportClass to instantiate
- * @return the configured read support
- */
- @SuppressWarnings("unchecked")
- static <T> ReadSupport<T> getReadSupportInstance(
- Class<? extends ReadSupport<T>> readSupportClass){
- try {
- return readSupportClass.newInstance();
- } catch (InstantiationException e) {
- throw new BadConfigurationException("could not instantiate read support class", e);
- } catch (IllegalAccessException e) {
- throw new BadConfigurationException("could not instantiate read support class", e);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
- Configuration configuration = ContextUtil.getConfiguration(jobContext);
- List<InputSplit> splits = new ArrayList<InputSplit>();
-
- if (isTaskSideMetaData(configuration)) {
- // Although not required by the API, some clients may depend on always
- // receiving ParquetInputSplit. Translation is required at some point.
- for (InputSplit split : super.getSplits(jobContext)) {
- Preconditions.checkArgument(split instanceof FileSplit,
- "Cannot wrap non-FileSplit: " + split);
- splits.add(ParquetInputSplit.from((FileSplit) split));
- }
- return splits;
-
- } else {
- splits.addAll(getSplits(configuration, getFooters(jobContext)));
- }
-
- return splits;
- }
-
- /**
- * @param configuration the configuration to connect to the file system
- * @param footers the footers of the files to read
- * @return the splits for the footers
- * @throws IOException
- * @deprecated split planning using file footers will be removed
- */
- @Deprecated
- public List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers) throws IOException {
- boolean strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
- final long maxSplitSize = configuration.getLong("mapred.max.split.size", Long.MAX_VALUE);
- final long minSplitSize = Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L));
- if (maxSplitSize < 0 || minSplitSize < 0) {
- throw new ParquetDecodingException("maxSplitSize or minSplitSize should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
- }
- GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers, strictTypeChecking);
- ReadContext readContext = getReadSupport(configuration).init(new InitContext(
- configuration,
- globalMetaData.getKeyValueMetaData(),
- globalMetaData.getSchema()));
-
- return new ClientSideMetadataSplitStrategy().getSplits(
- configuration, footers, maxSplitSize, minSplitSize, readContext);
- }
-
- /*
- * This is to support multi-level/recursive directory listing until
- * MAPREDUCE-1577 is fixed.
- */
- @Override
- protected List<FileStatus> listStatus(JobContext jobContext) throws IOException {
- return getAllFileRecursively(super.listStatus(jobContext),
- ContextUtil.getConfiguration(jobContext));
- }
-
- private static List<FileStatus> getAllFileRecursively(
- List<FileStatus> files, Configuration conf) throws IOException {
- List<FileStatus> result = new ArrayList<FileStatus>();
- for (FileStatus file : files) {
- if (file.isDir()) {
- Path p = file.getPath();
- FileSystem fs = p.getFileSystem(conf);
- staticAddInputPathRecursively(result, fs, p, HiddenFileFilter.INSTANCE);
- } else {
- result.add(file);
- }
- }
- LOG.info("Total input paths to process : " + result.size());
- return result;
- }
-
- private static void staticAddInputPathRecursively(List<FileStatus> result,
- FileSystem fs, Path path, PathFilter inputFilter)
- throws IOException {
- for (FileStatus stat: fs.listStatus(path, inputFilter)) {
- if (stat.isDir()) {
- staticAddInputPathRecursively(result, fs, stat.getPath(), inputFilter);
- } else {
- result.add(stat);
- }
- }
- }
-
- /**
- * @param jobContext the current job context
- * @return the footers for the files
- * @throws IOException
- */
- public List<Footer> getFooters(JobContext jobContext) throws IOException {
- List<FileStatus> statuses = listStatus(jobContext);
- if (statuses.isEmpty()) {
- return Collections.emptyList();
- }
- Configuration config = ContextUtil.getConfiguration(jobContext);
- List<Footer> footers = new ArrayList<Footer>(statuses.size());
- Set<FileStatus> missingStatuses = new HashSet<FileStatus>();
- Map<Path, FileStatusWrapper> missingStatusesMap =
- new HashMap<Path, FileStatusWrapper>(missingStatuses.size());
-
- if (footersCache == null) {
- footersCache =
- new LruCache<FileStatusWrapper, FootersCacheValue>(Math.max(statuses.size(), MIN_FOOTER_CACHE_SIZE));
- }
- for (FileStatus status : statuses) {
- FileStatusWrapper statusWrapper = new FileStatusWrapper(status);
- FootersCacheValue cacheEntry =
- footersCache.getCurrentValue(statusWrapper);
- if (Log.DEBUG) {
- LOG.debug("Cache entry " + (cacheEntry == null ? "not " : "")
- + " found for '" + status.getPath() + "'");
- }
- if (cacheEntry != null) {
- footers.add(cacheEntry.getFooter());
- } else {
- missingStatuses.add(status);
- missingStatusesMap.put(status.getPath(), statusWrapper);
- }
- }
- if (Log.DEBUG) {
- LOG.debug("found " + footers.size() + " footers in cache and adding up "
- + "to " + missingStatuses.size() + " missing footers to the cache");
- }
-
-
- if (missingStatuses.isEmpty()) {
- return footers;
- }
-
- List<Footer> newFooters = getFooters(config, missingStatuses);
- for (Footer newFooter : newFooters) {
- // Use the original file status objects to make sure we store a
- // conservative (older) modification time (i.e. in case the files and
- // footers were modified and it's not clear which version of the footers
- // we have)
- FileStatusWrapper fileStatus = missingStatusesMap.get(newFooter.getFile());
- footersCache.put(fileStatus, new FootersCacheValue(fileStatus, newFooter));
- }
-
- footers.addAll(newFooters);
- return footers;
- }
-
- public List<Footer> getFooters(Configuration configuration, List<FileStatus> statuses) throws IOException {
- return getFooters(configuration, (Collection<FileStatus>)statuses);
- }
-
- /**
- * the footers for the files
- * @param configuration to connect to the file system
- * @param statuses the files to open
- * @return the footers of the files
- * @throws IOException
- */
- public List<Footer> getFooters(Configuration configuration, Collection<FileStatus> statuses) throws IOException {
- if (Log.DEBUG) LOG.debug("reading " + statuses.size() + " files");
- boolean taskSideMetaData = isTaskSideMetaData(configuration);
- return ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses, taskSideMetaData);
- }
-
- /**
- * @param jobContext the current job context
- * @return the merged metadata from the footers
- * @throws IOException
- */
- public GlobalMetaData getGlobalMetaData(JobContext jobContext) throws IOException {
- return ParquetFileWriter.getGlobalMetaData(getFooters(jobContext));
- }
-
- /**
- * A simple wrapper around {@link parquet.hadoop.Footer} that also includes a
- * modification time associated with that footer. The modification time is
- * used to determine whether the footer is still current.
- */
- static final class FootersCacheValue
- implements LruCache.Value<FileStatusWrapper, FootersCacheValue> {
- private final long modificationTime;
- private final Footer footer;
-
- public FootersCacheValue(FileStatusWrapper status, Footer footer) {
- this.modificationTime = status.getModificationTime();
- this.footer = new Footer(footer.getFile(), footer.getParquetMetadata());
- }
-
- @Override
- public boolean isCurrent(FileStatusWrapper key) {
- long currentModTime = key.getModificationTime();
- boolean isCurrent = modificationTime >= currentModTime;
- if (Log.DEBUG && !isCurrent) {
- LOG.debug("The cache value for '" + key + "' is not current: "
- + "cached modification time=" + modificationTime + ", "
- + "current modification time: " + currentModTime);
- }
- return isCurrent;
- }
-
- public Footer getFooter() {
- return footer;
- }
-
- @Override
- public boolean isNewerThan(FootersCacheValue otherValue) {
- return otherValue == null ||
- modificationTime > otherValue.modificationTime;
- }
-
- public Path getPath() {
- return footer.getFile();
- }
- }
-
- /**
- * A simple wrapper around {@link org.apache.hadoop.fs.FileStatus} with a
- * meaningful "toString()" method
- */
- static final class FileStatusWrapper {
- private final FileStatus status;
- public FileStatusWrapper(FileStatus fileStatus) {
- if (fileStatus == null) {
- throw new IllegalArgumentException("FileStatus object cannot be null");
- }
- status = fileStatus;
- }
-
- public long getModificationTime() {
- return status.getModificationTime();
- }
-
- @Override
- public int hashCode() {
- return status.hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- return other instanceof FileStatusWrapper &&
- status.equals(((FileStatusWrapper) other).status);
- }
-
- @Override
- public String toString() {
- return status.getPath().toString();
- }
- }
-
-}
-
-class ClientSideMetadataSplitStrategy {
- //Wrapper of hdfs blocks, keep track of which HDFS block is being used
- private static class HDFSBlocks {
- BlockLocation[] hdfsBlocks;
- int currentStartHdfsBlockIndex = 0;//the hdfs block index corresponding to the start of a row group
- int currentMidPointHDFSBlockIndex = 0;// the hdfs block index corresponding to the mid-point of a row group, a split might be created only when the midpoint of the rowgroup enters a new hdfs block
-
- private HDFSBlocks(BlockLocation[] hdfsBlocks) {
- this.hdfsBlocks = hdfsBlocks;
- Comparator<BlockLocation> comparator = new Comparator<BlockLocation>() {
- @Override
- public int compare(BlockLocation b1, BlockLocation b2) {
- return Long.signum(b1.getOffset() - b2.getOffset());
- }
- };
- Arrays.sort(hdfsBlocks, comparator);
- }
-
- private long getHDFSBlockEndingPosition(int hdfsBlockIndex) {
- BlockLocation hdfsBlock = hdfsBlocks[hdfsBlockIndex];
- return hdfsBlock.getOffset() + hdfsBlock.getLength() - 1;
- }
-
- /**
- * @param rowGroupMetadata
- * @return true if the mid point of row group is in a new hdfs block, and also move the currentHDFSBlock pointer to the correct index that contains the row group;
- * return false if the mid point of row group is in the same hdfs block
- */
- private boolean checkBelongingToANewHDFSBlock(BlockMetaData rowGroupMetadata) {
- boolean isNewHdfsBlock = false;
- long rowGroupMidPoint = rowGroupMetadata.getStartingPos() + (rowGroupMetadata.getCompressedSize() / 2);
-
- //if mid point is not in the current HDFS block any more, return true
- while (rowGroupMidPoint > getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex)) {
- isNewHdfsBlock = true;
- currentMidPointHDFSBlockIndex++;
- if (currentMidPointHDFSBlockIndex >= hdfsBlocks.length)
- throw new ParquetDecodingException("the row group is not in hdfs blocks in the file: midpoint of row groups is "
- + rowGroupMidPoint
- + ", the end of the hdfs block is "
- + getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex - 1));
- }
-
- while (rowGroupMetadata.getStartingPos() > getHDFSBlockEndingPosition(currentStartHdfsBlockIndex)) {
- currentStartHdfsBlockIndex++;
- if (currentStartHdfsBlockIndex >= hdfsBlocks.length)
- throw new ParquetDecodingException("The row group does not start in this file: row group offset is "
- + rowGroupMetadata.getStartingPos()
- + " but the end of hdfs blocks of file is "
- + getHDFSBlockEndingPosition(currentStartHdfsBlockIndex));
- }
- return isNewHdfsBlock;
- }
-
- public BlockLocation getCurrentBlock() {
- return hdfsBlocks[currentStartHdfsBlockIndex];
- }
- }
-
- static class SplitInfo {
- List<BlockMetaData> rowGroups = new ArrayList<BlockMetaData>();
- BlockLocation hdfsBlock;
- long compressedByteSize = 0L;
-
- public SplitInfo(BlockLocation currentBlock) {
- this.hdfsBlock = currentBlock;
- }
-
- private void addRowGroup(BlockMetaData rowGroup) {
- this.rowGroups.add(rowGroup);
- this.compressedByteSize += rowGroup.getCompressedSize();
- }
-
- public long getCompressedByteSize() {
- return compressedByteSize;
- }
-
- public List<BlockMetaData> getRowGroups() {
- return rowGroups;
- }
-
- int getRowGroupCount() {
- return rowGroups.size();
- }
-
- public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, String requestedSchema, Map<String, String> readSupportMetadata) throws IOException {
- MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
- long length = 0;
-
- for (BlockMetaData block : this.getRowGroups()) {
- List<ColumnChunkMetaData> columns = block.getColumns();
- for (ColumnChunkMetaData column : columns) {
- if (requested.containsPath(column.getPath().toArray())) {
- length += column.getTotalSize();
- }
- }
- }
-
- BlockMetaData lastRowGroup = this.getRowGroups().get(this.getRowGroupCount() - 1);
- long end = lastRowGroup.getStartingPos() + lastRowGroup.getTotalByteSize();
-
- long[] rowGroupOffsets = new long[this.getRowGroupCount()];
- for (int i = 0; i < rowGroupOffsets.length; i++) {
- rowGroupOffsets[i] = this.getRowGroups().get(i).getStartingPos();
- }
-
- return new ParquetInputSplit(
- fileStatus.getPath(),
- hdfsBlock.getOffset(),
- end,
- length,
- hdfsBlock.getHosts(),
- rowGroupOffsets
- );
- }
- }
-
- private static final Log LOG = Log.getLog(ClientSideMetadataSplitStrategy.class);
-
- List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers,
- long maxSplitSize, long minSplitSize, ReadContext readContext)
- throws IOException {
- List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
- Filter filter = ParquetInputFormat.getFilter(configuration);
-
- long rowGroupsDropped = 0;
- long totalRowGroups = 0;
-
- for (Footer footer : footers) {
- final Path file = footer.getFile();
- LOG.debug(file);
- FileSystem fs = file.getFileSystem(configuration);
- FileStatus fileStatus = fs.getFileStatus(file);
- ParquetMetadata parquetMetaData = footer.getParquetMetadata();
- List<BlockMetaData> blocks = parquetMetaData.getBlocks();
-
- List<BlockMetaData> filteredBlocks;
-
- totalRowGroups += blocks.size();
- filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
- rowGroupsDropped += blocks.size() - filteredBlocks.size();
-
- if (filteredBlocks.isEmpty()) {
- continue;
- }
-
- BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
- splits.addAll(
- generateSplits(
- filteredBlocks,
- fileBlockLocations,
- fileStatus,
- readContext.getRequestedSchema().toString(),
- readContext.getReadSupportMetadata(),
- minSplitSize,
- maxSplitSize)
- );
- }
-
- if (rowGroupsDropped > 0 && totalRowGroups > 0) {
- int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
- LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
- } else {
- LOG.info("There were no row groups that could be dropped due to filter predicates");
- }
- return splits;
- }
-
- /**
- * groups together all the data blocks for the same HDFS block
- *
- * @param rowGroupBlocks data blocks (row groups)
- * @param hdfsBlocksArray hdfs blocks
- * @param fileStatus the containing file
- * @param requestedSchema the schema requested by the user
- * @param readSupportMetadata the metadata provided by the readSupport implementation in init
- * @param minSplitSize the mapred.min.split.size
- * @param maxSplitSize the mapred.max.split.size
- * @return the splits (one per HDFS block)
- * @throws IOException If hosts can't be retrieved for the HDFS block
- */
- static <T> List<ParquetInputSplit> generateSplits(
- List<BlockMetaData> rowGroupBlocks,
- BlockLocation[] hdfsBlocksArray,
- FileStatus fileStatus,
- String requestedSchema,
- Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
-
- List<SplitInfo> splitRowGroups =
- generateSplitInfo(rowGroupBlocks, hdfsBlocksArray, minSplitSize, maxSplitSize);
-
- //generate splits from rowGroups of each split
- List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
- for (SplitInfo splitInfo : splitRowGroups) {
- ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, requestedSchema, readSupportMetadata);
- resultSplits.add(split);
- }
- return resultSplits;
- }
-
- static List<SplitInfo> generateSplitInfo(
- List<BlockMetaData> rowGroupBlocks,
- BlockLocation[] hdfsBlocksArray,
- long minSplitSize, long maxSplitSize) {
- List<SplitInfo> splitRowGroups;
-
- if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
- throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
- }
- HDFSBlocks hdfsBlocks = new HDFSBlocks(hdfsBlocksArray);
- hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupBlocks.get(0));
- SplitInfo currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
-
- //assign rowGroups to splits
- splitRowGroups = new ArrayList<SplitInfo>();
- checkSorted(rowGroupBlocks);//assert row groups are sorted
- for (BlockMetaData rowGroupMetadata : rowGroupBlocks) {
- if ((hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupMetadata)
- && currentSplit.getCompressedByteSize() >= minSplitSize
- && currentSplit.getCompressedByteSize() > 0)
- || currentSplit.getCompressedByteSize() >= maxSplitSize) {
- //create a new split
- splitRowGroups.add(currentSplit);//finish previous split
- currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
- }
- currentSplit.addRowGroup(rowGroupMetadata);
- }
-
- if (currentSplit.getRowGroupCount() > 0) {
- splitRowGroups.add(currentSplit);
- }
-
- return splitRowGroups;
- }
-
- private static void checkSorted(List<BlockMetaData> rowGroupBlocks) {
- long previousOffset = 0L;
- for(BlockMetaData rowGroup: rowGroupBlocks) {
- long currentOffset = rowGroup.getStartingPos();
- if (currentOffset < previousOffset) {
- throw new ParquetDecodingException("row groups are not sorted: previous row groups starts at " + previousOffset + ", current row group starts at " + currentOffset);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
deleted file mode 100644
index 9bb422c..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
-/**
- * An input split for the Parquet format
- * It contains the information to read one block of the file.
- *
- * This class is private to the ParquetInputFormat.
- * Backward compatibility is not maintained.
- *
- * @author Julien Le Dem
- */
-@Private
-public class ParquetInputSplit extends FileSplit implements Writable {
-
-
- private long end;
- private long[] rowGroupOffsets;
-
- /**
- * Writables must have a parameterless constructor
- */
- public ParquetInputSplit() {
- super(null, 0, 0, new String[0]);
- }
-
- /**
- * For compatibility only
- * use {@link ParquetInputSplit#ParquetInputSplit(Path, long, long, long, String[], long[])}
- * @param path
- * @param start
- * @param length
- * @param hosts
- * @param blocks
- * @param requestedSchema
- * @param fileSchema
- * @param extraMetadata
- * @param readSupportMetadata
- */
- @Deprecated
- public ParquetInputSplit(
- Path path,
- long start,
- long length,
- String[] hosts,
- List<BlockMetaData> blocks,
- String requestedSchema,
- String fileSchema,
- Map<String, String> extraMetadata,
- Map<String, String> readSupportMetadata) {
- this(path, start, length, end(blocks, requestedSchema), hosts, offsets(blocks));
- }
-
- private static long end(List<BlockMetaData> blocks, String requestedSchema) {
- MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
- long length = 0;
-
- for (BlockMetaData block : blocks) {
- List<ColumnChunkMetaData> columns = block.getColumns();
- for (ColumnChunkMetaData column : columns) {
- if (requested.containsPath(column.getPath().toArray())) {
- length += column.getTotalSize();
- }
- }
- }
- return length;
- }
-
- private static long[] offsets(List<BlockMetaData> blocks) {
- long[] offsets = new long[blocks.size()];
- for (int i = 0; i < offsets.length; i++) {
- offsets[i] = blocks.get(i).getStartingPos();
- }
- return offsets;
- }
-
- /**
- * @return the block meta data
- * @deprecated the file footer is no longer read before creating input splits
- */
- @Deprecated
- public List<BlockMetaData> getBlocks() {
- throw new UnsupportedOperationException(
- "Splits no longer have row group metadata, see PARQUET-234");
- }
-
- /**
- * Builds a {@code ParquetInputSplit} from a mapreduce {@link FileSplit}.
- *
- * @param split a mapreduce FileSplit
- * @return a ParquetInputSplit
- * @throws IOException
- */
- static ParquetInputSplit from(FileSplit split) throws IOException {
- return new ParquetInputSplit(split.getPath(),
- split.getStart(), split.getStart() + split.getLength(),
- split.getLength(), split.getLocations(), null);
- }
-
- /**
- * Builds a {@code ParquetInputSplit} from a mapred
- * {@link org.apache.hadoop.mapred.FileSplit}.
- *
- * @param split a mapreduce FileSplit
- * @return a ParquetInputSplit
- * @throws IOException
- */
- static ParquetInputSplit from(org.apache.hadoop.mapred.FileSplit split) throws IOException {
- return new ParquetInputSplit(split.getPath(),
- split.getStart(), split.getStart() + split.getLength(),
- split.getLength(), split.getLocations(), null);
- }
-
- /**
- * @param file the path of the file for that split
- * @param start the start offset in the file
- * @param end the end offset in the file
- * @param length the actual size in bytes that we expect to read
- * @param hosts the hosts with the replicas of this data
- * @param rowGroupOffsets the offsets of the rowgroups selected if loaded on the client
- */
- public ParquetInputSplit(
- Path file, long start, long end, long length, String[] hosts,
- long[] rowGroupOffsets) {
- super(file, start, length, hosts);
- this.end = end;
- this.rowGroupOffsets = rowGroupOffsets;
- }
-
- /**
- * @return the requested schema
- * @deprecated the file footer is no longer read before creating input splits
- */
- @Deprecated
- String getRequestedSchema() {
- throw new UnsupportedOperationException(
- "Splits no longer have the requested schema, see PARQUET-234");
- }
-
- /**
- * @return the file schema
- * @deprecated the file footer is no longer read before creating input splits
- */
- @Deprecated
- public String getFileSchema() {
- throw new UnsupportedOperationException(
- "Splits no longer have the file schema, see PARQUET-234");
- }
-
- /**
- * @return the end offset of that split
- */
- public long getEnd() {
- return end;
- }
-
- /**
- * @return app specific metadata from the file
- * @deprecated the file footer is no longer read before creating input splits
- */
- @Deprecated
- public Map<String, String> getExtraMetadata() {
- throw new UnsupportedOperationException(
- "Splits no longer have file metadata, see PARQUET-234");
- }
-
- /**
- * @return app specific metadata provided by the read support in the init phase
- */
- @Deprecated
- Map<String, String> getReadSupportMetadata() {
- throw new UnsupportedOperationException(
- "Splits no longer have read-support metadata, see PARQUET-234");
- }
-
- /**
- * @return the offsets of the row group selected if this has been determined on the client side
- */
- public long[] getRowGroupOffsets() {
- return rowGroupOffsets;
- }
-
- @Override
- public String toString() {
- String hosts;
- try{
- hosts = Arrays.toString(getLocations());
- } catch (Exception e) {
- // IOException/InterruptedException could be thrown
- hosts = "(" + e + ")";
- }
-
- return this.getClass().getSimpleName() + "{" +
- "part: " + getPath()
- + " start: " + getStart()
- + " end: " + getEnd()
- + " length: " + getLength()
- + " hosts: " + hosts
- + (rowGroupOffsets == null ? "" : (" row groups: " + Arrays.toString(rowGroupOffsets)))
- + "}";
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void readFields(DataInput hin) throws IOException {
- byte[] bytes = readArray(hin);
- DataInputStream in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)));
- super.readFields(in);
- this.end = in.readLong();
- if (in.readBoolean()) {
- this.rowGroupOffsets = new long[in.readInt()];
- for (int i = 0; i < rowGroupOffsets.length; i++) {
- rowGroupOffsets[i] = in.readLong();
- }
- }
- in.close();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void write(DataOutput hout) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(new GZIPOutputStream(baos));
- super.write(out);
- out.writeLong(end);
- out.writeBoolean(rowGroupOffsets != null);
- if (rowGroupOffsets != null) {
- out.writeInt(rowGroupOffsets.length);
- for (long o : rowGroupOffsets) {
- out.writeLong(o);
- }
- }
- out.close();
- writeArray(hout, baos.toByteArray());
- }
-
- private static void writeArray(DataOutput out, byte[] bytes) throws IOException {
- out.writeInt(bytes.length);
- out.write(bytes, 0, bytes.length);
- }
-
- private static byte[] readArray(DataInput in) throws IOException {
- int len = in.readInt();
- byte[] bytes = new byte[len];
- in.readFully(bytes);
- return bytes;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
deleted file mode 100644
index 0e0ce42..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-
-import parquet.Log;
-import parquet.hadoop.util.ContextUtil;
-
-public class ParquetOutputCommitter extends FileOutputCommitter {
- private static final Log LOG = Log.getLog(ParquetOutputCommitter.class);
-
- private final Path outputPath;
-
- public ParquetOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
- super(outputPath, context);
- this.outputPath = outputPath;
- }
-
- public void commitJob(JobContext jobContext) throws IOException {
- super.commitJob(jobContext);
- Configuration configuration = ContextUtil.getConfiguration(jobContext);
- writeMetaDataFile(configuration,outputPath);
- }
-
- public static void writeMetaDataFile(Configuration configuration, Path outputPath) {
- if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
- try {
- final FileSystem fileSystem = outputPath.getFileSystem(configuration);
- FileStatus outputStatus = fileSystem.getFileStatus(outputPath);
- List<Footer> footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus);
- try {
- ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers);
- } catch (Exception e) {
- LOG.warn("could not write summary file for " + outputPath, e);
- final Path metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE);
- if (fileSystem.exists(metadataPath)) {
- fileSystem.delete(metadataPath, true);
- }
- }
- } catch (Exception e) {
- LOG.warn("could not write summary file for " + outputPath, e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
deleted file mode 100644
index 43648f4..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static parquet.Log.INFO;
-import static parquet.Preconditions.checkNotNull;
-import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
-import static parquet.hadoop.util.ContextUtil.getConfiguration;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-import parquet.Log;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.hadoop.api.WriteSupport;
-import parquet.hadoop.api.WriteSupport.WriteContext;
-import parquet.hadoop.codec.CodecConfig;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.util.ConfigurationUtil;
-
-/**
- * OutputFormat to write to a Parquet file
- *
- * It requires a {@link WriteSupport} to convert the actual records to the underlying format.
- * It requires the schema of the incoming records. (provided by the write support)
- * It allows storing extra metadata in the footer (for example: for schema compatibility purpose when converting from a different schema language).
- *
- * The format configuration settings in the job configuration:
- * <pre>
- * # The block size is the size of a row group being buffered in memory
- * # this limits the memory usage when writing
- * # Larger values will improve the IO when reading but consume more memory when writing
- * parquet.block.size=134217728 # in bytes, default = 128 * 1024 * 1024
- *
- * # The page size is for compression. When reading, each page can be decompressed independently.
- * # A block is composed of pages. The page is the smallest unit that must be read fully to access a single record.
- * # If this value is too small, the compression will deteriorate
- * parquet.page.size=1048576 # in bytes, default = 1 * 1024 * 1024
- *
- * # There is one dictionary page per column per row group when dictionary encoding is used.
- * # The dictionary page size works like the page size but for dictionary
- * parquet.dictionary.page.size=1048576 # in bytes, default = 1 * 1024 * 1024
- *
- * # The compression algorithm used to compress pages
- * parquet.compression=UNCOMPRESSED # one of: UNCOMPRESSED, SNAPPY, GZIP, LZO. Default: UNCOMPRESSED. Supersedes mapred.output.compress*
- *
- * # The write support class to convert the records written to the OutputFormat into the events accepted by the record consumer
- * # Usually provided by a specific ParquetOutputFormat subclass
- * parquet.write.support.class= # fully qualified name
- *
- * # To enable/disable dictionary encoding
- * parquet.enable.dictionary=true # false to disable dictionary encoding
- *
- * # To enable/disable summary metadata aggregation at the end of a MR job
- * # The default is true (enabled)
- * parquet.enable.summary-metadata=true # false to disable summary aggregation
- * </pre>
- *
- * If parquet.compression is not set, the following properties are checked (FileOutputFormat behavior).
- * Note that we explicitely disallow custom Codecs
- * <pre>
- * mapred.output.compress=true
- * mapred.output.compression.codec=org.apache.hadoop.io.compress.SomeCodec # the codec must be one of Snappy, GZip or LZO
- * </pre>
- *
- * if none of those is set the data is uncompressed.
- *
- * @author Julien Le Dem
- *
- * @param <T> the type of the materialized records
- */
-public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
- private static final Log LOG = Log.getLog(ParquetOutputFormat.class);
-
- public static final String BLOCK_SIZE = "parquet.block.size";
- public static final String PAGE_SIZE = "parquet.page.size";
- public static final String COMPRESSION = "parquet.compression";
- public static final String WRITE_SUPPORT_CLASS = "parquet.write.support.class";
- public static final String DICTIONARY_PAGE_SIZE = "parquet.dictionary.page.size";
- public static final String ENABLE_DICTIONARY = "parquet.enable.dictionary";
- public static final String VALIDATION = "parquet.validation";
- public static final String WRITER_VERSION = "parquet.writer.version";
- public static final String ENABLE_JOB_SUMMARY = "parquet.enable.summary-metadata";
- public static final String MEMORY_POOL_RATIO = "parquet.memory.pool.ratio";
- public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size";
-
- public static void setWriteSupportClass(Job job, Class<?> writeSupportClass) {
- getConfiguration(job).set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
- }
-
- public static void setWriteSupportClass(JobConf job, Class<?> writeSupportClass) {
- job.set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
- }
-
- public static Class<?> getWriteSupportClass(Configuration configuration) {
- final String className = configuration.get(WRITE_SUPPORT_CLASS);
- if (className == null) {
- return null;
- }
- final Class<?> writeSupportClass = ConfigurationUtil.getClassFromConfig(configuration, WRITE_SUPPORT_CLASS, WriteSupport.class);
- return writeSupportClass;
- }
-
- public static void setBlockSize(Job job, int blockSize) {
- getConfiguration(job).setInt(BLOCK_SIZE, blockSize);
- }
-
- public static void setPageSize(Job job, int pageSize) {
- getConfiguration(job).setInt(PAGE_SIZE, pageSize);
- }
-
- public static void setDictionaryPageSize(Job job, int pageSize) {
- getConfiguration(job).setInt(DICTIONARY_PAGE_SIZE, pageSize);
- }
-
- public static void setCompression(Job job, CompressionCodecName compression) {
- getConfiguration(job).set(COMPRESSION, compression.name());
- }
-
- public static void setEnableDictionary(Job job, boolean enableDictionary) {
- getConfiguration(job).setBoolean(ENABLE_DICTIONARY, enableDictionary);
- }
-
- public static boolean getEnableDictionary(JobContext jobContext) {
- return getEnableDictionary(getConfiguration(jobContext));
- }
-
- public static int getBlockSize(JobContext jobContext) {
- return getBlockSize(getConfiguration(jobContext));
- }
-
- public static int getPageSize(JobContext jobContext) {
- return getPageSize(getConfiguration(jobContext));
- }
-
- public static int getDictionaryPageSize(JobContext jobContext) {
- return getDictionaryPageSize(getConfiguration(jobContext));
- }
-
- public static CompressionCodecName getCompression(JobContext jobContext) {
- return getCompression(getConfiguration(jobContext));
- }
-
- public static boolean isCompressionSet(JobContext jobContext) {
- return isCompressionSet(getConfiguration(jobContext));
- }
-
- public static void setValidation(JobContext jobContext, boolean validating) {
- setValidation(getConfiguration(jobContext), validating);
- }
-
- public static boolean getValidation(JobContext jobContext) {
- return getValidation(getConfiguration(jobContext));
- }
-
- public static boolean getEnableDictionary(Configuration configuration) {
- return configuration.getBoolean(ENABLE_DICTIONARY, true);
- }
-
- @Deprecated
- public static int getBlockSize(Configuration configuration) {
- return configuration.getInt(BLOCK_SIZE, DEFAULT_BLOCK_SIZE);
- }
-
- public static long getLongBlockSize(Configuration configuration) {
- return configuration.getLong(BLOCK_SIZE, DEFAULT_BLOCK_SIZE);
- }
-
- public static int getPageSize(Configuration configuration) {
- return configuration.getInt(PAGE_SIZE, DEFAULT_PAGE_SIZE);
- }
-
- public static int getDictionaryPageSize(Configuration configuration) {
- return configuration.getInt(DICTIONARY_PAGE_SIZE, DEFAULT_PAGE_SIZE);
- }
-
- public static WriterVersion getWriterVersion(Configuration configuration) {
- String writerVersion = configuration.get(WRITER_VERSION, WriterVersion.PARQUET_1_0.toString());
- return WriterVersion.fromString(writerVersion);
- }
-
- public static CompressionCodecName getCompression(Configuration configuration) {
- return CodecConfig.getParquetCompressionCodec(configuration);
- }
-
- public static boolean isCompressionSet(Configuration configuration) {
- return CodecConfig.isParquetCompressionSet(configuration);
- }
-
- public static void setValidation(Configuration configuration, boolean validating) {
- configuration.setBoolean(VALIDATION, validating);
- }
-
- public static boolean getValidation(Configuration configuration) {
- return configuration.getBoolean(VALIDATION, false);
- }
-
- private CompressionCodecName getCodec(TaskAttemptContext taskAttemptContext) {
- return CodecConfig.from(taskAttemptContext).getCodec();
- }
-
-
-
- private WriteSupport<T> writeSupport;
- private ParquetOutputCommitter committer;
-
- /**
- * constructor used when this OutputFormat in wrapped in another one (In Pig for example)
- * @param writeSupportClass the class used to convert the incoming records
- * @param schema the schema of the records
- * @param extraMetaData extra meta data to be stored in the footer of the file
- */
- public <S extends WriteSupport<T>> ParquetOutputFormat(S writeSupport) {
- this.writeSupport = writeSupport;
- }
-
- /**
- * used when directly using the output format and configuring the write support implementation
- * using parquet.write.support.class
- */
- public <S extends WriteSupport<T>> ParquetOutputFormat() {
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public RecordWriter<Void, T> getRecordWriter(TaskAttemptContext taskAttemptContext)
- throws IOException, InterruptedException {
-
- final Configuration conf = getConfiguration(taskAttemptContext);
-
- CompressionCodecName codec = getCodec(taskAttemptContext);
- String extension = codec.getExtension() + ".parquet";
- Path file = getDefaultWorkFile(taskAttemptContext, extension);
- return getRecordWriter(conf, file, codec);
- }
-
- public RecordWriter<Void, T> getRecordWriter(TaskAttemptContext taskAttemptContext, Path file)
- throws IOException, InterruptedException {
- return getRecordWriter(getConfiguration(taskAttemptContext), file, getCodec(taskAttemptContext));
- }
-
- public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec)
- throws IOException, InterruptedException {
- final WriteSupport<T> writeSupport = getWriteSupport(conf);
-
- CodecFactory codecFactory = new CodecFactory(conf);
- long blockSize = getLongBlockSize(conf);
- if (INFO) LOG.info("Parquet block size to " + blockSize);
- int pageSize = getPageSize(conf);
- if (INFO) LOG.info("Parquet page size to " + pageSize);
- int dictionaryPageSize = getDictionaryPageSize(conf);
- if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize);
- boolean enableDictionary = getEnableDictionary(conf);
- if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off"));
- boolean validating = getValidation(conf);
- if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));
- WriterVersion writerVersion = getWriterVersion(conf);
- if (INFO) LOG.info("Writer version is: " + writerVersion);
-
- WriteContext init = writeSupport.init(conf);
- ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);
- w.start();
-
- float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
- MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
- long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,
- MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);
- if (memoryManager == null) {
- memoryManager = new MemoryManager(maxLoad, minAllocation);
- } else if (memoryManager.getMemoryPoolRatio() != maxLoad) {
- LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +
- "be reset by the new value: " + maxLoad);
- }
-
- return new ParquetRecordWriter<T>(
- w,
- writeSupport,
- init.getSchema(),
- init.getExtraMetaData(),
- blockSize, pageSize,
- codecFactory.getCompressor(codec, pageSize),
- dictionaryPageSize,
- enableDictionary,
- validating,
- writerVersion,
- memoryManager);
- }
-
- /**
- * @param configuration to find the configuration for the write support class
- * @return the configured write support
- */
- @SuppressWarnings("unchecked")
- public WriteSupport<T> getWriteSupport(Configuration configuration){
- if (writeSupport != null) return writeSupport;
- Class<?> writeSupportClass = getWriteSupportClass(configuration);
- try {
- return (WriteSupport<T>)checkNotNull(writeSupportClass, "writeSupportClass").newInstance();
- } catch (InstantiationException e) {
- throw new BadConfigurationException("could not instantiate write support class: " + writeSupportClass, e);
- } catch (IllegalAccessException e) {
- throw new BadConfigurationException("could not instantiate write support class: " + writeSupportClass, e);
- }
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context)
- throws IOException {
- if (committer == null) {
- Path output = getOutputPath(context);
- committer = new ParquetOutputCommitter(output, context);
- }
- return committer;
- }
-
-
- /**
- * This memory manager is for all the real writers (InternalParquetRecordWriter) in one task.
- */
- private static MemoryManager memoryManager;
-
- static MemoryManager getMemoryManager() {
- return memoryManager;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
deleted file mode 100644
index 7c3ecbb..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static parquet.Preconditions.checkNotNull;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.filter.UnboundRecordFilter;
-import parquet.filter2.compat.FilterCompat;
-import parquet.filter2.compat.FilterCompat.Filter;
-import parquet.filter2.compat.RowGroupFilter;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.util.HiddenFileFilter;
-import parquet.schema.MessageType;
-
-/**
- * Read records from a Parquet file.
- * TODO: too many constructors (https://issues.apache.org/jira/browse/PARQUET-39)
- */
-public class ParquetReader<T> implements Closeable {
-
- private final ReadSupport<T> readSupport;
- private final Configuration conf;
- private final Iterator<Footer> footersIterator;
- private final Filter filter;
-
- private InternalParquetRecordReader<T> reader;
-
- /**
- * @param file the file to read
- * @param readSupport to materialize records
- * @throws IOException
- * @deprecated use {@link #builder(ReadSupport, Path)}
- */
- @Deprecated
- public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
- this(new Configuration(), file, readSupport, FilterCompat.NOOP);
- }
-
- /**
- * @param conf the configuration
- * @param file the file to read
- * @param readSupport to materialize records
- * @throws IOException
- * @deprecated use {@link #builder(ReadSupport, Path)}
- */
- @Deprecated
- public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
- this(conf, file, readSupport, FilterCompat.NOOP);
- }
-
- /**
- * @param file the file to read
- * @param readSupport to materialize records
- * @param unboundRecordFilter the filter to use to filter records
- * @throws IOException
- * @deprecated use {@link #builder(ReadSupport, Path)}
- */
- @Deprecated
- public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
- this(new Configuration(), file, readSupport, FilterCompat.get(unboundRecordFilter));
- }
-
- /**
- * @param conf the configuration
- * @param file the file to read
- * @param readSupport to materialize records
- * @param unboundRecordFilter the filter to use to filter records
- * @throws IOException
- * @deprecated use {@link #builder(ReadSupport, Path)}
- */
- @Deprecated
- public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
- this(conf, file, readSupport, FilterCompat.get(unboundRecordFilter));
- }
-
- private ParquetReader(Configuration conf,
- Path file,
- ReadSupport<T> readSupport,
- Filter filter) throws IOException {
- this.readSupport = readSupport;
- this.filter = checkNotNull(filter, "filter");
- this.conf = conf;
-
- FileSystem fs = file.getFileSystem(conf);
- List<FileStatus> statuses = Arrays.asList(fs.listStatus(file, HiddenFileFilter.INSTANCE));
- List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses, false);
- this.footersIterator = footers.iterator();
- }
-
- /**
- * @return the next record or null if finished
- * @throws IOException
- */
- public T read() throws IOException {
- try {
- if (reader != null && reader.nextKeyValue()) {
- return reader.getCurrentValue();
- } else {
- initReader();
- return reader == null ? null : read();
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- private void initReader() throws IOException {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- if (footersIterator.hasNext()) {
- Footer footer = footersIterator.next();
-
- List<BlockMetaData> blocks = footer.getParquetMetadata().getBlocks();
-
- MessageType fileSchema = footer.getParquetMetadata().getFileMetaData().getSchema();
-
- List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(
- filter, blocks, fileSchema);
-
- reader = new InternalParquetRecordReader<T>(readSupport, filter);
- reader.initialize(fileSchema,
- footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
- footer.getFile(), filteredBlocks, conf);
- }
- }
-
- @Override
- public void close() throws IOException {
- if (reader != null) {
- reader.close();
- }
- }
-
- public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) {
- return new Builder<T>(readSupport, path);
- }
-
- public static class Builder<T> {
- private final ReadSupport<T> readSupport;
- private final Path file;
- private Configuration conf;
- private Filter filter;
-
- private Builder(ReadSupport<T> readSupport, Path path) {
- this.readSupport = checkNotNull(readSupport, "readSupport");
- this.file = checkNotNull(path, "path");
- this.conf = new Configuration();
- this.filter = FilterCompat.NOOP;
- }
-
- public Builder<T> withConf(Configuration conf) {
- this.conf = checkNotNull(conf, "conf");
- return this;
- }
-
- public Builder<T> withFilter(Filter filter) {
- this.filter = checkNotNull(filter, "filter");
- return this;
- }
-
- public ParquetReader<T> build() throws IOException {
- return new ParquetReader<T>(conf, file, readSupport, filter);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
deleted file mode 100644
index abf65c1..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static parquet.filter2.compat.RowGroupFilter.filterRowGroups;
-import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-import static parquet.format.converter.ParquetMetadataConverter.range;
-import static parquet.hadoop.ParquetFileReader.readFooter;
-import static parquet.hadoop.ParquetInputFormat.getFilter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import parquet.Log;
-import parquet.filter.UnboundRecordFilter;
-import parquet.filter2.compat.FilterCompat;
-import parquet.filter2.compat.FilterCompat.Filter;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.hadoop.util.ContextUtil;
-import parquet.hadoop.util.counters.BenchmarkCounter;
-import parquet.schema.MessageType;
-
-/**
- * Reads the records from a block of a Parquet file
- *
- * @see ParquetInputFormat
- *
- * @author Julien Le Dem
- *
- * @param <T> type of the materialized records
- */
-public class ParquetRecordReader<T> extends RecordReader<Void, T> {
-
- private static final Log LOG = Log.getLog(ParquetRecordReader.class);
- private final InternalParquetRecordReader<T> internalReader;
-
- /**
- * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
- */
- public ParquetRecordReader(ReadSupport<T> readSupport) {
- this(readSupport, FilterCompat.NOOP);
- }
-
- /**
- * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
- * @param filter for filtering individual records
- */
- public ParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
- internalReader = new InternalParquetRecordReader<T>(readSupport, filter);
- }
-
- /**
- * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
- * @param filter for filtering individual records
- * @deprecated use {@link #ParquetRecordReader(ReadSupport, Filter)}
- */
- @Deprecated
- public ParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
- this(readSupport, FilterCompat.get(filter));
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void close() throws IOException {
- internalReader.close();
- }
-
- /**
- * always returns null
- */
- @Override
- public Void getCurrentKey() throws IOException, InterruptedException {
- return null;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public T getCurrentValue() throws IOException,
- InterruptedException {
- return internalReader.getCurrentValue();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return internalReader.getProgress();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- if (context instanceof TaskInputOutputContext<?, ?, ?, ?>) {
- BenchmarkCounter.initCounterFromContext((TaskInputOutputContext<?, ?, ?, ?>) context);
- } else {
- LOG.error("Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is "
- + context.getClass().getCanonicalName());
- }
-
- initializeInternalReader(toParquetSplit(inputSplit), ContextUtil.getConfiguration(context));
- }
-
- public void initialize(InputSplit inputSplit, Configuration configuration, Reporter reporter)
- throws IOException, InterruptedException {
- BenchmarkCounter.initCounterFromReporter(reporter,configuration);
- initializeInternalReader(toParquetSplit(inputSplit), configuration);
- }
-
- private void initializeInternalReader(ParquetInputSplit split, Configuration configuration) throws IOException {
- Path path = split.getPath();
- long[] rowGroupOffsets = split.getRowGroupOffsets();
- List<BlockMetaData> filteredBlocks;
- ParquetMetadata footer;
- // if task.side.metadata is set, rowGroupOffsets is null
- if (rowGroupOffsets == null) {
- // then we need to apply the predicate push down filter
- footer = readFooter(configuration, path, range(split.getStart(), split.getEnd()));
- MessageType fileSchema = footer.getFileMetaData().getSchema();
- Filter filter = getFilter(configuration);
- filteredBlocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
- } else {
- // otherwise we find the row groups that were selected on the client
- footer = readFooter(configuration, path, NO_FILTER);
- Set<Long> offsets = new HashSet<Long>();
- for (long offset : rowGroupOffsets) {
- offsets.add(offset);
- }
- filteredBlocks = new ArrayList<BlockMetaData>();
- for (BlockMetaData block : footer.getBlocks()) {
- if (offsets.contains(block.getStartingPos())) {
- filteredBlocks.add(block);
- }
- }
- // verify we found them all
- if (filteredBlocks.size() != rowGroupOffsets.length) {
- long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
- for (int i = 0; i < foundRowGroupOffsets.length; i++) {
- foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
- }
- // this should never happen.
- // provide a good error message in case there's a bug
- throw new IllegalStateException(
- "All the offsets listed in the split should be found in the file."
- + " expected: " + Arrays.toString(rowGroupOffsets)
- + " found: " + filteredBlocks
- + " out of: " + Arrays.toString(foundRowGroupOffsets)
- + " in range " + split.getStart() + ", " + split.getEnd());
- }
- }
- MessageType fileSchema = footer.getFileMetaData().getSchema();
- Map<String, String> fileMetaData = footer.getFileMetaData().getKeyValueMetaData();
- internalReader.initialize(
- fileSchema, fileMetaData, path, filteredBlocks, configuration);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return internalReader.nextKeyValue();
- }
-
- private ParquetInputSplit toParquetSplit(InputSplit split) throws IOException {
- if (split instanceof ParquetInputSplit) {
- return (ParquetInputSplit) split;
- } else if (split instanceof FileSplit) {
- return ParquetInputSplit.from((FileSplit) split);
- } else if (split instanceof org.apache.hadoop.mapred.FileSplit) {
- return ParquetInputSplit.from(
- (org.apache.hadoop.mapred.FileSplit) split);
- } else {
- throw new IllegalArgumentException(
- "Invalid split (not a FileSplit or ParquetInputSplit): " + split);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java
deleted file mode 100644
index 968a334..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordWriter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import java.io.IOException;
-import java.util.Map;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.hadoop.CodecFactory.BytesCompressor;
-import parquet.hadoop.api.WriteSupport;
-import parquet.schema.MessageType;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * Writes records to a Parquet file
- *
- * @see ParquetOutputFormat
- *
- * @author Julien Le Dem
- *
- * @param <T> the type of the materialized records
- */
-public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {
-
- private InternalParquetRecordWriter<T> internalWriter;
- private MemoryManager memoryManager;
-
- /**
- *
- * @param w the file to write to
- * @param writeSupport the class to convert incoming records
- * @param schema the schema of the records
- * @param extraMetaData extra meta data to write in the footer of the file
- * @param blockSize the size of a block in the file (this will be approximate)
- * @param compressor the compressor used to compress the pages
- * @param dictionaryPageSize the threshold for dictionary size
- * @param enableDictionary to enable the dictionary
- * @param validating if schema validation should be turned on
- */
- @Deprecated
- public ParquetRecordWriter(
- ParquetFileWriter w,
- WriteSupport<T> writeSupport,
- MessageType schema,
- Map<String, String> extraMetaData,
- int blockSize, int pageSize,
- BytesCompressor compressor,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating,
- WriterVersion writerVersion) {
- internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
- extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
- validating, writerVersion);
- }
-
- /**
- *
- * @param w the file to write to
- * @param writeSupport the class to convert incoming records
- * @param schema the schema of the records
- * @param extraMetaData extra meta data to write in the footer of the file
- * @param blockSize the size of a block in the file (this will be approximate)
- * @param compressor the compressor used to compress the pages
- * @param dictionaryPageSize the threshold for dictionary size
- * @param enableDictionary to enable the dictionary
- * @param validating if schema validation should be turned on
- */
- public ParquetRecordWriter(
- ParquetFileWriter w,
- WriteSupport<T> writeSupport,
- MessageType schema,
- Map<String, String> extraMetaData,
- long blockSize, int pageSize,
- BytesCompressor compressor,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating,
- WriterVersion writerVersion,
- MemoryManager memoryManager) {
- internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
- extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
- validating, writerVersion);
- this.memoryManager = checkNotNull(memoryManager, "memoryManager");
- memoryManager.addWriter(internalWriter, blockSize);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- internalWriter.close();
- if (memoryManager != null) {
- memoryManager.removeWriter(internalWriter);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void write(Void key, T value) throws IOException, InterruptedException {
- internalWriter.write(value);
- }
-
-}
\ No newline at end of file