You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:08 UTC
[11/51] [partial] parquet-mr git commit: PARQUET-23: Rename to
org.apache.parquet.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
new file mode 100644
index 0000000..6b69bc9
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.mapred;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.Arrays.asList;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.ParquetRecordReader;
+
+public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.FileInputFormat<Void, Container<V>> {
+
+ protected ParquetInputFormat<V> realInputFormat = new ParquetInputFormat<V>();
+
+ @Override
+ public RecordReader<Void, Container<V>> getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+ return new RecordReaderWrapper<V>(split, job, reporter);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ if (isTaskSideMetaData(job)) {
+ return super.getSplits(job, numSplits);
+ }
+
+ List<Footer> footers = getFooters(job);
+ List<ParquetInputSplit> splits = realInputFormat.getSplits(job, footers);
+ if (splits == null) {
+ return null;
+ }
+ InputSplit[] resultSplits = new InputSplit[splits.size()];
+ int i = 0;
+ for (ParquetInputSplit split : splits) {
+ resultSplits[i++] = new ParquetInputSplitWrapper(split);
+ }
+ return resultSplits;
+ }
+
+ public List<Footer> getFooters(JobConf job) throws IOException {
+ return realInputFormat.getFooters(job, asList(super.listStatus(job)));
+ }
+
+ private static class RecordReaderWrapper<V> implements RecordReader<Void, Container<V>> {
+
+ private ParquetRecordReader<V> realReader;
+ private long splitLen; // for getPos()
+
+ private Container<V> valueContainer = null;
+
+ private boolean firstRecord = false;
+ private boolean eof = false;
+
+ public RecordReaderWrapper(
+ InputSplit oldSplit, JobConf oldJobConf, Reporter reporter)
+ throws IOException {
+ splitLen = oldSplit.getLength();
+
+ try {
+ realReader = new ParquetRecordReader<V>(
+ ParquetInputFormat.<V>getReadSupportInstance(oldJobConf),
+ ParquetInputFormat.getFilter(oldJobConf));
+
+ if (oldSplit instanceof ParquetInputSplitWrapper) {
+ realReader.initialize(((ParquetInputSplitWrapper) oldSplit).realSplit, oldJobConf, reporter);
+ } else if (oldSplit instanceof FileSplit) {
+ realReader.initialize((FileSplit) oldSplit, oldJobConf, reporter);
+ } else {
+ throw new IllegalArgumentException(
+ "Invalid split (not a FileSplit or ParquetInputSplitWrapper): " + oldSplit);
+ }
+
+ // read once to gain access to key and value objects
+ if (realReader.nextKeyValue()) {
+ firstRecord = true;
+ valueContainer = new Container<V>();
+ valueContainer.set(realReader.getCurrentValue());
+
+ } else {
+ eof = true;
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ realReader.close();
+ }
+
+ @Override
+ public Void createKey() {
+ return null;
+ }
+
+ @Override
+ public Container<V> createValue() {
+ return valueContainer;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return (long) (splitLen * getProgress());
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ try {
+ return realReader.getProgress();
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean next(Void key, Container<V> value) throws IOException {
+ if (eof) {
+ return false;
+ }
+
+ if (firstRecord) { // key & value are already read.
+ firstRecord = false;
+ return true;
+ }
+
+ try {
+ if (realReader.nextKeyValue()) {
+ if (value != null) value.set(realReader.getCurrentValue());
+ return true;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ eof = true; // strictly not required, just for consistency
+ return false;
+ }
+ }
+
+ public static boolean isTaskSideMetaData(JobConf job) {
+ return job.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, TRUE);
+ }
+
+ private static class ParquetInputSplitWrapper implements InputSplit {
+
+ ParquetInputSplit realSplit;
+
+ @SuppressWarnings("unused") // MapReduce instantiates this.
+ public ParquetInputSplitWrapper() {}
+
+ public ParquetInputSplitWrapper(ParquetInputSplit realSplit) {
+ this.realSplit = realSplit;
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return realSplit.getLength();
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ return realSplit.getLocations();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ realSplit = new ParquetInputSplit();
+ realSplit.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ realSplit.write(out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
new file mode 100644
index 0000000..33f589b
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.ParquetRecordWriter;
+import org.apache.parquet.hadoop.codec.CodecConfig;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+public class DeprecatedParquetOutputFormat<V> extends org.apache.hadoop.mapred.FileOutputFormat<Void, V> {
+
+ public static void setWriteSupportClass(Configuration configuration, Class<?> writeSupportClass) {
+ configuration.set(ParquetOutputFormat.WRITE_SUPPORT_CLASS, writeSupportClass.getName());
+ }
+
+ public static void setBlockSize(Configuration configuration, int blockSize) {
+ configuration.setInt(ParquetOutputFormat.BLOCK_SIZE, blockSize);
+ }
+
+ public static void setPageSize(Configuration configuration, int pageSize) {
+ configuration.setInt(ParquetOutputFormat.PAGE_SIZE, pageSize);
+ }
+
+ public static void setCompression(Configuration configuration, CompressionCodecName compression) {
+ configuration.set(ParquetOutputFormat.COMPRESSION, compression.name());
+ }
+
+ public static void setEnableDictionary(Configuration configuration, boolean enableDictionary) {
+ configuration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, enableDictionary);
+ }
+
+ public static void setAsOutputFormat(JobConf jobConf) {
+ jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+ jobConf.setOutputCommitter(MapredParquetOutputCommitter.class);
+ }
+
+ private CompressionCodecName getCodec(final JobConf conf) {
+ return CodecConfig.from(conf).getCodec();
+ }
+
+ private static Path getDefaultWorkFile(JobConf conf, String name, String extension) {
+ String file = getUniqueName(conf, name) + extension;
+ return new Path(getWorkOutputPath(conf), file);
+ }
+
+ protected ParquetOutputFormat<V> realOutputFormat = new ParquetOutputFormat<V>();
+
+ @Override
+ public RecordWriter<Void, V> getRecordWriter(FileSystem fs,
+ JobConf conf, String name, Progressable progress) throws IOException {
+ return new RecordWriterWrapper(realOutputFormat, fs, conf, name, progress);
+ }
+
+ private class RecordWriterWrapper implements RecordWriter<Void, V> {
+
+ private ParquetRecordWriter<V> realWriter;
+
+ public RecordWriterWrapper(ParquetOutputFormat<V> realOutputFormat,
+ FileSystem fs, JobConf conf, String name, Progressable progress) throws IOException {
+
+ CompressionCodecName codec = getCodec(conf);
+ String extension = codec.getExtension() + ".parquet";
+ Path file = getDefaultWorkFile(conf, name, extension);
+
+ try {
+ realWriter = (ParquetRecordWriter<V>) realOutputFormat.getRecordWriter(conf, file, codec);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ try {
+ realWriter.close(null);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void write(Void key, V value) throws IOException {
+ try {
+ realWriter.write(key, value);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new IOException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/MapredParquetOutputCommitter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
new file mode 100644
index 0000000..0504db8
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+import org.apache.parquet.hadoop.ParquetOutputCommitter;
+import org.apache.parquet.hadoop.util.ContextUtil;
+
+import java.io.IOException;
+
+/**
+ *
+ * Adapter for supporting ParquetOutputCommitter in mapred API
+ *
+ * @author Tianshuo Deng
+ */
+public class MapredParquetOutputCommitter extends FileOutputCommitter {
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ super.commitJob(jobContext);
+ Configuration conf = ContextUtil.getConfiguration(jobContext);
+ Path outputPath = FileOutputFormat.getOutputPath(new JobConf(conf));
+ ParquetOutputCommitter.writeMetaDataFile(conf, outputPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
new file mode 100644
index 0000000..13e6fa8
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * Block metadata stored in the footer and passed in an InputSplit
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class BlockMetaData {
+
+ private List<ColumnChunkMetaData> columns = new ArrayList<ColumnChunkMetaData>();
+ private long rowCount;
+ private long totalByteSize;
+ private String path;
+
+ public BlockMetaData() {
+ }
+
+
+ /**
+ * @param path the path to the file containing the data. Or null if same file the metadata was found
+ */
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ /**
+ * @return the path relative to the parent of this file where the data is. Or null if it is in the same file.
+ */
+ public String getPath() {
+ return path;
+ }
+
+ /**
+ * @return the rowCount
+ */
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ /**
+ * @param rowCount the rowCount to set
+ */
+ public void setRowCount(long rowCount) {
+ this.rowCount = rowCount;
+ }
+
+ /**
+ * @return the totalByteSize
+ */
+ public long getTotalByteSize() {
+ return totalByteSize;
+ }
+
+ /**
+ * @param totalByteSize the totalByteSize to set
+ */
+ public void setTotalByteSize(long totalByteSize) {
+ this.totalByteSize = totalByteSize;
+ }
+
+ /**
+ *
+ * @param column the metadata for a column
+ */
+ public void addColumn(ColumnChunkMetaData column) {
+ columns.add(column);
+ }
+
+ /**
+ *
+ * @return the metadata for columns
+ */
+ public List<ColumnChunkMetaData> getColumns() {
+ return Collections.unmodifiableList(columns);
+ }
+
+ /**
+ *
+ * @return the starting pos of first column
+ */
+ public long getStartingPos() {
+ return getColumns().get(0).getStartingPos();
+ }
+ @Override
+ public String toString() {
+ return "BlockMetaData{" + rowCount + ", " + totalByteSize + " " + columns + "}";
+ }
+
+ /**
+ * @return the compressed size of all columns
+ */
+ public long getCompressedSize() {
+ long totalSize = 0;
+ for (ColumnChunkMetaData col : getColumns()) {
+ totalSize += col.getTotalSize();
+ }
+ return totalSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
new file mode 100644
index 0000000..0c2fd4d
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.Set;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.BooleanStatistics;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Column meta data for a block stored in the file footer and passed in the InputSplit
+ * @author Julien Le Dem
+ */
+abstract public class ColumnChunkMetaData {
+
+ @Deprecated
+ public static ColumnChunkMetaData get(
+ ColumnPath path,
+ PrimitiveTypeName type,
+ CompressionCodecName codec,
+ Set<Encoding> encodings,
+ long firstDataPage,
+ long dictionaryPageOffset,
+ long valueCount,
+ long totalSize,
+ long totalUncompressedSize) {
+ // to save space we store those always positive longs in ints when they fit.
+ if (positiveLongFitsInAnInt(firstDataPage)
+ && positiveLongFitsInAnInt(dictionaryPageOffset)
+ && positiveLongFitsInAnInt(valueCount)
+ && positiveLongFitsInAnInt(totalSize)
+ && positiveLongFitsInAnInt(totalUncompressedSize)) {
+ return new IntColumnChunkMetaData(
+ path, type, codec, encodings,
+ new BooleanStatistics(),
+ firstDataPage,
+ dictionaryPageOffset,
+ valueCount,
+ totalSize,
+ totalUncompressedSize);
+ } else {
+ return new LongColumnChunkMetaData(
+ path, type, codec, encodings,
+ new BooleanStatistics(),
+ firstDataPage,
+ dictionaryPageOffset,
+ valueCount,
+ totalSize,
+ totalUncompressedSize);
+ }
+ }
+
+
+ public static ColumnChunkMetaData get(
+ ColumnPath path,
+ PrimitiveTypeName type,
+ CompressionCodecName codec,
+ Set<Encoding> encodings,
+ Statistics statistics,
+ long firstDataPage,
+ long dictionaryPageOffset,
+ long valueCount,
+ long totalSize,
+ long totalUncompressedSize) {
+ // to save space we store those always positive longs in ints when they fit.
+ if (positiveLongFitsInAnInt(firstDataPage)
+ && positiveLongFitsInAnInt(dictionaryPageOffset)
+ && positiveLongFitsInAnInt(valueCount)
+ && positiveLongFitsInAnInt(totalSize)
+ && positiveLongFitsInAnInt(totalUncompressedSize)) {
+ return new IntColumnChunkMetaData(
+ path, type, codec, encodings,
+ statistics,
+ firstDataPage,
+ dictionaryPageOffset,
+ valueCount,
+ totalSize,
+ totalUncompressedSize);
+ } else {
+ return new LongColumnChunkMetaData(
+ path, type, codec, encodings,
+ statistics,
+ firstDataPage,
+ dictionaryPageOffset,
+ valueCount,
+ totalSize,
+ totalUncompressedSize);
+ }
+ }
+
+ /**
+ * @return the offset of the first byte in the chunk
+ */
+ public long getStartingPos() {
+ long dictionaryPageOffset = getDictionaryPageOffset();
+ long firstDataPageOffset = getFirstDataPageOffset();
+ if (dictionaryPageOffset > 0 && dictionaryPageOffset < firstDataPageOffset) {
+ // if there's a dictionary and it's before the first data page, start from there
+ return dictionaryPageOffset;
+ }
+ return firstDataPageOffset;
+ }
+
+ /**
+ * checks that a positive long value fits in an int.
+ * (reindexed on Integer.MIN_VALUE)
+ * @param value
+ * @return whether it fits
+ */
+ protected static boolean positiveLongFitsInAnInt(long value) {
+ return (value >= 0) && (value + Integer.MIN_VALUE <= Integer.MAX_VALUE);
+ }
+
+ // we save 3 references by storing together the column properties that have few distinct values
+ private final ColumnChunkProperties properties;
+
+ protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
+ this.properties = columnChunkProperties;
+ }
+
+ public CompressionCodecName getCodec() {
+ return properties.getCodec();
+ }
+
+ /**
+ *
+ * @return column identifier
+ */
+ public ColumnPath getPath() {
+ return properties.getPath();
+ }
+
+ /**
+ * @return type of the column
+ */
+ public PrimitiveTypeName getType() {
+ return properties.getType();
+ }
+
+ /**
+ * @return start of the column data offset
+ */
+ abstract public long getFirstDataPageOffset();
+
+ /**
+ * @return the location of the dictionary page if any
+ */
+ abstract public long getDictionaryPageOffset();
+
+ /**
+ * @return count of values in this block of the column
+ */
+ abstract public long getValueCount();
+
+ /**
+ * @return the totalUncompressedSize
+ */
+ abstract public long getTotalUncompressedSize();
+
+ /**
+ * @return the totalSize
+ */
+ abstract public long getTotalSize();
+
+ /**
+ * @return the stats for this column
+ */
+ abstract public Statistics getStatistics();
+
+ /**
+ * @return all the encodings used in this column
+ */
+ public Set<Encoding> getEncodings() {
+ return properties.getEncodings();
+ }
+
+
+ @Override
+ public String toString() {
+ return "ColumnMetaData{" + properties.toString() + ", " + getFirstDataPageOffset() + "}";
+ }
+}
+
+class IntColumnChunkMetaData extends ColumnChunkMetaData {
+
+ private final int firstDataPage;
+ private final int dictionaryPageOffset;
+ private final int valueCount;
+ private final int totalSize;
+ private final int totalUncompressedSize;
+ private final Statistics statistics;
+
+ /**
+ * @param path column identifier
+ * @param type type of the column
+ * @param codec
+ * @param encodings
+ * @param statistics
+ * @param firstDataPage
+ * @param dictionaryPageOffset
+ * @param valueCount
+ * @param totalSize
+ * @param totalUncompressedSize
+ */
+ IntColumnChunkMetaData(
+ ColumnPath path,
+ PrimitiveTypeName type,
+ CompressionCodecName codec,
+ Set<Encoding> encodings,
+ Statistics statistics,
+ long firstDataPage,
+ long dictionaryPageOffset,
+ long valueCount,
+ long totalSize,
+ long totalUncompressedSize) {
+ super(ColumnChunkProperties.get(path, type, codec, encodings));
+ this.firstDataPage = positiveLongToInt(firstDataPage);
+ this.dictionaryPageOffset = positiveLongToInt(dictionaryPageOffset);
+ this.valueCount = positiveLongToInt(valueCount);
+ this.totalSize = positiveLongToInt(totalSize);
+ this.totalUncompressedSize = positiveLongToInt(totalUncompressedSize);
+ this.statistics = statistics;
+ }
+
+ /**
+ * stores a positive long into an int (assuming it fits)
+ * @param value
+ * @return
+ */
+ private int positiveLongToInt(long value) {
+ if (!ColumnChunkMetaData.positiveLongFitsInAnInt(value)) {
+ throw new IllegalArgumentException("value should be positive and fit in an int: " + value);
+ }
+ return (int)(value + Integer.MIN_VALUE);
+ }
+
+ /**
+ * turns the int back into a positive long
+ * @param value
+ * @return
+ */
+ private long intToPositiveLong(int value) {
+ return (long)value - Integer.MIN_VALUE;
+ }
+
+ /**
+ * @return start of the column data offset
+ */
+ public long getFirstDataPageOffset() {
+ return intToPositiveLong(firstDataPage);
+ }
+
+ /**
+ * @return the location of the dictionary page if any
+ */
+ public long getDictionaryPageOffset() {
+ return intToPositiveLong(dictionaryPageOffset);
+ }
+
+ /**
+ * @return count of values in this block of the column
+ */
+ public long getValueCount() {
+ return intToPositiveLong(valueCount);
+ }
+
+ /**
+ * @return the totalUncompressedSize
+ */
+ public long getTotalUncompressedSize() {
+ return intToPositiveLong(totalUncompressedSize);
+ }
+
+ /**
+ * @return the totalSize
+ */
+ public long getTotalSize() {
+ return intToPositiveLong(totalSize);
+ }
+
+ /**
+ * @return the stats for this column
+ */
+ public Statistics getStatistics() {
+ return statistics;
+ }
+}
+class LongColumnChunkMetaData extends ColumnChunkMetaData {
+
+ private final long firstDataPageOffset;
+ private final long dictionaryPageOffset;
+ private final long valueCount;
+ private final long totalSize;
+ private final long totalUncompressedSize;
+ private final Statistics statistics;
+
+ /**
+ * @param path column identifier
+ * @param type type of the column
+ * @param codec
+ * @param encodings
+ * @param statistics
+ * @param firstDataPageOffset
+ * @param dictionaryPageOffset
+ * @param valueCount
+ * @param totalSize
+ * @param totalUncompressedSize
+ */
+ LongColumnChunkMetaData(
+ ColumnPath path,
+ PrimitiveTypeName type,
+ CompressionCodecName codec,
+ Set<Encoding> encodings,
+ Statistics statistics,
+ long firstDataPageOffset,
+ long dictionaryPageOffset,
+ long valueCount,
+ long totalSize,
+ long totalUncompressedSize) {
+ super(ColumnChunkProperties.get(path, type, codec, encodings));
+ this.firstDataPageOffset = firstDataPageOffset;
+ this.dictionaryPageOffset = dictionaryPageOffset;
+ this.valueCount = valueCount;
+ this.totalSize = totalSize;
+ this.totalUncompressedSize = totalUncompressedSize;
+ this.statistics = statistics;
+ }
+
+ /**
+ * @return start of the column data offset
+ */
+ public long getFirstDataPageOffset() {
+ return firstDataPageOffset;
+ }
+
+ /**
+ * @return the location of the dictionary page if any
+ */
+ public long getDictionaryPageOffset() {
+ return dictionaryPageOffset;
+ }
+
+ /**
+ * @return count of values in this block of the column
+ */
+ public long getValueCount() {
+ return valueCount;
+ }
+
+ /**
+ * @return the totalUncompressedSize
+ */
+ public long getTotalUncompressedSize() {
+ return totalUncompressedSize;
+ }
+
+ /**
+ * @return the totalSize
+ */
+ public long getTotalSize() {
+ return totalSize;
+ }
+
+ /**
+ * @return the stats for this column
+ */
+ public Statistics getStatistics() {
+ return statistics;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java
new file mode 100644
index 0000000..5e26675
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.Arrays;
+import java.util.Set;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+public class ColumnChunkProperties {
+
+ private static Canonicalizer<ColumnChunkProperties> properties = new Canonicalizer<ColumnChunkProperties>();
+
+ public static ColumnChunkProperties get(ColumnPath path, PrimitiveTypeName type, CompressionCodecName codec, Set<Encoding> encodings) {
+ return properties.canonicalize(new ColumnChunkProperties(codec, path, type, encodings));
+ }
+
+ private final CompressionCodecName codec;
+ private final ColumnPath path;
+ private final PrimitiveTypeName type;
+ private final Set<Encoding> encodings;
+
+ private ColumnChunkProperties(CompressionCodecName codec,
+ ColumnPath path,
+ PrimitiveTypeName type,
+ Set<Encoding> encodings) {
+ super();
+ this.codec = codec;
+ this.path = path;
+ this.type = type;
+ this.encodings = encodings;
+ }
+
+ public CompressionCodecName getCodec() {
+ return codec;
+ }
+
+ public ColumnPath getPath() {
+ return path;
+ }
+
+ public PrimitiveTypeName getType() {
+ return type;
+ }
+
+ public Set<Encoding> getEncodings() {
+ return encodings;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ColumnChunkProperties) {
+ ColumnChunkProperties other = (ColumnChunkProperties)obj;
+ return other.codec == codec && other.path.equals(path) && other.type == type && equals(other.encodings, encodings);
+ }
+ return false;
+ }
+
+ private boolean equals(Set<Encoding> a, Set<Encoding> b) {
+ return a.size() == b.size() && a.containsAll(b);
+ }
+
+ @Override
+ public int hashCode() {
+ return codec.hashCode() ^ path.hashCode() ^ type.hashCode() ^ Arrays.hashCode(encodings.toArray());
+ }
+
+ @Override
+ public String toString() {
+ return codec + " " + path + " " + type + " " + encodings;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
new file mode 100644
index 0000000..558bea7
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import org.apache.parquet.format.CompressionCodec;
+import org.apache.parquet.hadoop.codec.CompressionCodecNotSupportedException;
+
+public enum CompressionCodecName {
+ UNCOMPRESSED(null, CompressionCodec.UNCOMPRESSED, ""),
+ SNAPPY("org.apache.parquet.hadoop.codec.SnappyCodec", CompressionCodec.SNAPPY, ".snappy"),
+ GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, ".gz"),
+ LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo");
+
+ public static CompressionCodecName fromConf(String name) {
+ if (name == null) {
+ return UNCOMPRESSED;
+ }
+ return valueOf(name.toUpperCase());
+ }
+
+ public static CompressionCodecName fromCompressionCodec(Class<?> clazz) {
+ if (clazz == null) {
+ return UNCOMPRESSED;
+ }
+ String name = clazz.getName();
+ for (CompressionCodecName codec : CompressionCodecName.values()) {
+ if (name.equals(codec.getHadoopCompressionCodecClassName())) {
+ return codec;
+ }
+ }
+ throw new CompressionCodecNotSupportedException(clazz);
+ }
+
+ public static CompressionCodecName fromParquet(CompressionCodec codec) {
+ for (CompressionCodecName codecName : CompressionCodecName.values()) {
+ if (codec.equals(codecName.parquetCompressionCodec)) {
+ return codecName;
+ }
+ }
+ throw new IllegalArgumentException("Unknown compression codec " + codec);
+ }
+
+ private final String hadoopCompressionCodecClass;
+ private final CompressionCodec parquetCompressionCodec;
+ private final String extension;
+
+ private CompressionCodecName(String hadoopCompressionCodecClass, CompressionCodec parquetCompressionCodec, String extension) {
+ this.hadoopCompressionCodecClass = hadoopCompressionCodecClass;
+ this.parquetCompressionCodec = parquetCompressionCodec;
+ this.extension = extension;
+ }
+
+ public String getHadoopCompressionCodecClassName() {
+ return hadoopCompressionCodecClass;
+ }
+
+ public Class getHadoopCompressionCodecClass() {
+ String codecClassName = getHadoopCompressionCodecClassName();
+ if (codecClassName==null) {
+ return null;
+ }
+ try {
+ return Class.forName(codecClassName);
+ } catch (ClassNotFoundException e) {
+ return null;
+ }
+ }
+
+ public CompressionCodec getParquetCompressionCodec() {
+ return parquetCompressionCodec;
+ }
+
+ public String getExtension() {
+ return extension;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/EncodingList.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/EncodingList.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/EncodingList.java
new file mode 100644
index 0000000..ef073c6
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/EncodingList.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.parquet.column.Encoding;
+
+public class EncodingList implements Iterable<Encoding> {
+
+ private static Canonicalizer<EncodingList> encodingLists = new Canonicalizer<EncodingList>();
+
+ public static EncodingList getEncodingList(List<Encoding> encodings) {
+ return encodingLists.canonicalize(new EncodingList(encodings));
+ }
+
+ private final List<Encoding> encodings;
+
+ private EncodingList(List<Encoding> encodings) {
+ super();
+ this.encodings = Collections.unmodifiableList(encodings);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof EncodingList) {
+ List<org.apache.parquet.column.Encoding> other = ((EncodingList)obj).encodings;
+ final int size = other.size();
+ if (size != encodings.size()) {
+ return false;
+ }
+ for (int i = 0; i < size; i++) {
+ if (!other.get(i).equals(encodings.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 1;
+ for (org.apache.parquet.column.Encoding element : encodings)
+ result = 31 * result + (element == null ? 0 : element.hashCode());
+ return result;
+ }
+
+ public List<Encoding> toList() {
+ return encodings;
+ }
+
+ @Override
+ public Iterator<Encoding> iterator() {
+ return encodings.iterator();
+ }
+
+ public int size() {
+ return encodings.size();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java
new file mode 100644
index 0000000..6135d58
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import static java.util.Collections.unmodifiableMap;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.parquet.schema.MessageType;
+
+
+/**
+ * File level meta data (Schema, codec, ...)
+ *
+ * @author Julien Le Dem
+ *
+ */
+public final class FileMetaData implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final MessageType schema;
+
+ private final Map<String, String> keyValueMetaData;
+
+ private final String createdBy;
+
+ /**
+ * @param schema the schema for the file
+ * @param keyValueMetaData the app specific metadata
+ * @param createdBy the description of the library that created the file
+ */
+ public FileMetaData(MessageType schema, Map<String, String> keyValueMetaData, String createdBy) {
+ super();
+ this.schema = checkNotNull(schema, "schema");
+ this.keyValueMetaData = unmodifiableMap(checkNotNull(keyValueMetaData, "keyValueMetaData"));
+ this.createdBy = createdBy;
+ }
+
+ /**
+ * @return the schema for the file
+ */
+ public MessageType getSchema() {
+ return schema;
+ }
+
+ @Override
+ public String toString() {
+ return "FileMetaData{schema: "+schema+ ", metadata: " + keyValueMetaData + "}";
+ }
+
+ /**
+ * @return meta data for extensions
+ */
+ public Map<String, String> getKeyValueMetaData() {
+ return keyValueMetaData;
+ }
+
+ /**
+ * @return the description of the library that created the file
+ */
+ public String getCreatedBy() {
+ return createdBy;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
new file mode 100644
index 0000000..677ef03
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import static java.util.Collections.unmodifiableMap;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Merged metadata when reading from multiple files.
+ * THis is to allow schema evolution
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class GlobalMetaData implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final MessageType schema;
+
+ private final Map<String, Set<String>> keyValueMetaData;
+
+ private final Set<String> createdBy;
+
+ /**
+ * @param schema the union of the schemas for all the files
+ * @param keyValueMetaData the merged app specific metadata
+ * @param createdBy the description of the library that created the file
+ */
+ public GlobalMetaData(MessageType schema, Map<String, Set<String>> keyValueMetaData, Set<String> createdBy) {
+ super();
+ this.schema = checkNotNull(schema, "schema");
+ this.keyValueMetaData = unmodifiableMap(checkNotNull(keyValueMetaData, "keyValueMetaData"));
+ this.createdBy = createdBy;
+ }
+
+ /**
+ * @return the schema for the file
+ */
+ public MessageType getSchema() {
+ return schema;
+ }
+
+ @Override
+ public String toString() {
+ return "GlobalMetaData{schema: "+schema+ ", metadata: " + keyValueMetaData + "}";
+ }
+
+ /**
+ * @return meta data for extensions
+ */
+ public Map<String, Set<String>> getKeyValueMetaData() {
+ return keyValueMetaData;
+ }
+
+ /**
+ * @return the description of the library that created the file
+ */
+ public Set<String> getCreatedBy() {
+ return createdBy;
+ }
+
+ /**
+ * Will merge the metadata as if it was coming from a single file.
+ * (for all part files written together this will always work)
+ * If there are conflicting values an exception will be thrown
+ * @return the merged version of this
+ */
+ public FileMetaData merge() {
+ String createdByString = createdBy.size() == 1 ?
+ createdBy.iterator().next() :
+ createdBy.toString();
+ Map<String, String> mergedKeyValues = new HashMap<String, String>();
+ for (Entry<String, Set<String>> entry : keyValueMetaData.entrySet()) {
+ if (entry.getValue().size() > 1) {
+ throw new RuntimeException("could not merge metadata: key " + entry.getKey() + " has conflicting values: " + entry.getValue());
+ }
+ mergedKeyValues.put(entry.getKey(), entry.getValue().iterator().next());
+ }
+ return new FileMetaData(schema, mergedKeyValues, createdByString);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
new file mode 100644
index 0000000..d35582a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.List;
+
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig.Feature;
+
+/**
+ * Meta Data block stored in the footer of the file
+ * contains file level (Codec, Schema, ...) and block level (location, columns, record count, ...) meta data
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ParquetMetadata {
+
+ private static ObjectMapper objectMapper = new ObjectMapper();
+ private static ObjectMapper prettyObjectMapper = new ObjectMapper();
+ static {
+ prettyObjectMapper.configure(Feature.INDENT_OUTPUT, true);
+ }
+
+ /**
+ *
+ * @param parquetMetaData
+ * @return the json representation
+ */
+ public static String toJSON(ParquetMetadata parquetMetaData) {
+ return toJSON(parquetMetaData, objectMapper);
+ }
+
+ /**
+ *
+ * @param parquetMetaData
+ * @return the pretty printed json representation
+ */
+ public static String toPrettyJSON(ParquetMetadata parquetMetaData) {
+ return toJSON(parquetMetaData, prettyObjectMapper);
+ }
+
+ private static String toJSON(ParquetMetadata parquetMetaData, ObjectMapper mapper) {
+ StringWriter stringWriter = new StringWriter();
+ try {
+ mapper.writeValue(stringWriter, parquetMetaData);
+ } catch (JsonGenerationException e) {
+ throw new RuntimeException(e);
+ } catch (JsonMappingException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return stringWriter.toString();
+ }
+
+ /**
+ *
+ * @param json the json representation
+ * @return the parsed object
+ */
+ public static ParquetMetadata fromJSON(String json) {
+ try {
+ return objectMapper.readValue(new StringReader(json), ParquetMetadata.class);
+ } catch (JsonParseException e) {
+ throw new RuntimeException(e);
+ } catch (JsonMappingException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private final FileMetaData fileMetaData;
+ private final List<BlockMetaData> blocks;
+
+ /**
+ *
+ * @param fileMetaData file level metadata
+ * @param blocks block level metadata
+ * @param keyValueMetaData
+ */
+ public ParquetMetadata(FileMetaData fileMetaData, List<BlockMetaData> blocks) {
+ this.fileMetaData = fileMetaData;
+ this.blocks = blocks;
+ }
+
+ /**
+ *
+ * @return block level metadata
+ */
+ public List<BlockMetaData> getBlocks() {
+ return blocks;
+ }
+
+ /**
+ *
+ * @return file level meta data
+ */
+ public FileMetaData getFileMetaData() {
+ return fileMetaData;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ParquetMetaData{"+fileMetaData+", blocks: "+blocks+"}";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/package-info.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/package-info.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/package-info.java
new file mode 100644
index 0000000..d319812
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/package-info.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ *
+ * <p>
+ * Provides classes to store use Parquet files in Hadoop
+ *
+ * In a map reduce job:
+ * @see org.apache.parquet.hadoop.ParquetInputFormat
+ * @see org.apache.parquet.hadoop.ParquetOutputFormat
+ *
+ * In a standalone java app:
+ * @see org.apache.parquet.hadoop.ParquetWriter
+ * @see org.apache.parquet.hadoop.ParquetReader
+ *
+ * </p>
+ */
+package org.apache.parquet.hadoop;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
new file mode 100644
index 0000000..7f39cd7
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.BadConfigurationException;
+
+public class ConfigurationUtil {
+
+ public static Class<?> getClassFromConfig(Configuration configuration, String configName, Class<?> assignableFrom) {
+ final String className = configuration.get(configName);
+ if (className == null) {
+ return null;
+ }
+
+ try {
+ final Class<?> foundClass = configuration.getClassByName(className);
+ if (!assignableFrom.isAssignableFrom(foundClass)) {
+ throw new BadConfigurationException("class " + className + " set in job conf at "
+ + configName + " is not a subclass of " + assignableFrom.getCanonicalName());
+ }
+ return foundClass;
+ } catch (ClassNotFoundException e) {
+ throw new BadConfigurationException("could not instantiate class " + className + " set in job conf at " + configName, e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
new file mode 100644
index 0000000..106fb0c
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/*
+ * This is based on ContextFactory.java from hadoop-2.0.x sources.
+ */
+
+/**
+ * Utility methods to allow applications to deal with inconsistencies between
+ * MapReduce Context Objects API between hadoop-0.20 and later versions.
+ */
+public class ContextUtil {
+
+ private static final boolean useV21;
+
+ private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
+ private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
+ private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR;
+ private static final Constructor<?> MAP_CONTEXT_IMPL_CONSTRUCTOR;
+ private static final Constructor<?> GENERIC_COUNTER_CONSTRUCTOR;
+
+ private static final Field READER_FIELD;
+ private static final Field WRITER_FIELD;
+ private static final Field OUTER_MAP_FIELD;
+ private static final Field WRAPPED_CONTEXT_FIELD;
+
+ private static final Method GET_CONFIGURATION_METHOD;
+ private static final Method GET_COUNTER_METHOD;
+ private static final Method INCREMENT_COUNTER_METHOD;
+
+ static {
+ boolean v21 = true;
+ final String PACKAGE = "org.apache.hadoop.mapreduce";
+ try {
+ Class.forName(PACKAGE + ".task.JobContextImpl");
+ } catch (ClassNotFoundException cnfe) {
+ v21 = false;
+ }
+ useV21 = v21;
+ Class<?> jobContextCls;
+ Class<?> taskContextCls;
+ Class<?> taskIOContextCls;
+ Class<?> mapCls;
+ Class<?> mapContextCls;
+ Class<?> innerMapContextCls;
+ Class<?> genericCounterCls;
+ try {
+ if (v21) {
+ jobContextCls =
+ Class.forName(PACKAGE+".task.JobContextImpl");
+ taskContextCls =
+ Class.forName(PACKAGE+".task.TaskAttemptContextImpl");
+ taskIOContextCls =
+ Class.forName(PACKAGE+".task.TaskInputOutputContextImpl");
+ mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl");
+ mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper");
+ innerMapContextCls =
+ Class.forName(PACKAGE+".lib.map.WrappedMapper$Context");
+ genericCounterCls = Class.forName(PACKAGE+".counters.GenericCounter");
+ } else {
+ jobContextCls =
+ Class.forName(PACKAGE+".JobContext");
+ taskContextCls =
+ Class.forName(PACKAGE+".TaskAttemptContext");
+ taskIOContextCls =
+ Class.forName(PACKAGE+".TaskInputOutputContext");
+ mapContextCls = Class.forName(PACKAGE + ".MapContext");
+ mapCls = Class.forName(PACKAGE + ".Mapper");
+ innerMapContextCls =
+ Class.forName(PACKAGE+".Mapper$Context");
+ genericCounterCls =
+ Class.forName("org.apache.hadoop.mapred.Counters$Counter");
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Can't find class", e);
+ }
+ try {
+ JOB_CONTEXT_CONSTRUCTOR =
+ jobContextCls.getConstructor(Configuration.class, JobID.class);
+ JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
+ TASK_CONTEXT_CONSTRUCTOR =
+ taskContextCls.getConstructor(Configuration.class,
+ TaskAttemptID.class);
+ TASK_CONTEXT_CONSTRUCTOR.setAccessible(true);
+ GENERIC_COUNTER_CONSTRUCTOR =
+ genericCounterCls.getDeclaredConstructor(String.class,
+ String.class,
+ Long.TYPE);
+ GENERIC_COUNTER_CONSTRUCTOR.setAccessible(true);
+
+ if (useV21) {
+ MAP_CONTEXT_CONSTRUCTOR =
+ innerMapContextCls.getConstructor(mapCls,
+ MapContext.class);
+ MAP_CONTEXT_IMPL_CONSTRUCTOR =
+ mapContextCls.getDeclaredConstructor(Configuration.class,
+ TaskAttemptID.class,
+ RecordReader.class,
+ RecordWriter.class,
+ OutputCommitter.class,
+ StatusReporter.class,
+ InputSplit.class);
+ MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true);
+ WRAPPED_CONTEXT_FIELD =
+ innerMapContextCls.getDeclaredField("mapContext");
+ WRAPPED_CONTEXT_FIELD.setAccessible(true);
+ Method get_counter_method;
+ try {
+ get_counter_method = Class.forName(PACKAGE + ".TaskAttemptContext").getMethod("getCounter", String.class,
+ String.class);
+ } catch (Exception e) {
+ get_counter_method = Class.forName(PACKAGE + ".TaskInputOutputContext").getMethod("getCounter",
+ String.class, String.class);
+ }
+ GET_COUNTER_METHOD=get_counter_method;
+ } else {
+ MAP_CONTEXT_CONSTRUCTOR =
+ innerMapContextCls.getConstructor(mapCls,
+ Configuration.class,
+ TaskAttemptID.class,
+ RecordReader.class,
+ RecordWriter.class,
+ OutputCommitter.class,
+ StatusReporter.class,
+ InputSplit.class);
+ MAP_CONTEXT_IMPL_CONSTRUCTOR = null;
+ WRAPPED_CONTEXT_FIELD = null;
+ GET_COUNTER_METHOD=taskIOContextCls.getMethod("getCounter", String.class, String.class);
+ }
+ MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
+ READER_FIELD = mapContextCls.getDeclaredField("reader");
+ READER_FIELD.setAccessible(true);
+ WRITER_FIELD = taskIOContextCls.getDeclaredField("output");
+ WRITER_FIELD.setAccessible(true);
+ OUTER_MAP_FIELD = innerMapContextCls.getDeclaredField("this$0");
+ OUTER_MAP_FIELD.setAccessible(true);
+ GET_CONFIGURATION_METHOD = Class.forName(PACKAGE+".JobContext")
+ .getMethod("getConfiguration");
+ INCREMENT_COUNTER_METHOD = Class.forName(PACKAGE+".Counter")
+ .getMethod("increment", Long.TYPE);
+ } catch (SecurityException e) {
+ throw new IllegalArgumentException("Can't run constructor ", e);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException("Can't find constructor ", e);
+ } catch (NoSuchFieldException e) {
+ throw new IllegalArgumentException("Can't find field ", e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Can't find class", e);
+ }
+ }
+
+ /**
+ * Creates JobContext from a JobConf and jobId using the correct constructor
+ * for based on Hadoop version. <code>jobId</code> could be null.
+ */
+ public static JobContext newJobContext(Configuration conf, JobID jobId) {
+ try {
+ return (JobContext)
+ JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, jobId);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Can't instantiate JobContext", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Can't instantiate JobContext", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't instantiate JobContext", e);
+ }
+ }
+
+ /**
+ * Creates TaskAttempContext from a JobConf and jobId using the correct
+ * constructor for based on Hadoop version.
+ */
+ public static TaskAttemptContext newTaskAttemptContext(
+ Configuration conf, TaskAttemptID taskAttemptId) {
+ try {
+ return (TaskAttemptContext)
+ TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, taskAttemptId);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Can't instantiate TaskAttemptContext", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Can't instantiate TaskAttemptContext", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't instantiate TaskAttemptContext", e);
+ }
+ }
+
+ /**
+ * @return with Hadoop 2 : <code>new GenericCounter(args)</code>,<br>
+ * with Hadoop 1 : <code>new Counter(args)</code>
+ */
+ public static Counter newGenericCounter(String name, String displayName, long value) {
+ try {
+ return (Counter)
+ GENERIC_COUNTER_CONSTRUCTOR.newInstance(name, displayName, value);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Can't instantiate Counter", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Can't instantiate Counter", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't instantiate Counter", e);
+ }
+ }
+
+ /**
+ * Invoke getConfiguration() method on JobContext. Works with both
+ * Hadoop 1 and 2.
+ */
+ public static Configuration getConfiguration(JobContext context) {
+ try {
+ return (Configuration) GET_CONFIGURATION_METHOD.invoke(context);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Can't invoke method", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't invoke method", e);
+ }
+ }
+
+ public static Counter getCounter(TaskInputOutputContext context,
+ String groupName, String counterName) {
+ return (Counter) invoke(GET_COUNTER_METHOD, context, groupName, counterName);
+ }
+
+ /**
+ * Invokes a method and rethrows any exception as runtime exceptions.
+ */
+ private static Object invoke(Method method, Object obj, Object... args) {
+ try {
+ return method.invoke(obj, args);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Can't invoke method " + method.getName(), e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalArgumentException("Can't invoke method " + method.getName(), e);
+ }
+ }
+
+ public static void incrementCounter(Counter counter, long increment) {
+ invoke(INCREMENT_COUNTER_METHOD, counter, increment);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HiddenFileFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HiddenFileFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HiddenFileFilter.java
new file mode 100644
index 0000000..1817bb2
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HiddenFileFilter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+public class HiddenFileFilter implements PathFilter {
+ public static final HiddenFileFilter INSTANCE = new HiddenFileFilter();
+
+ private HiddenFileFilter() {}
+
+ @Override
+ public boolean accept(Path p) {
+ return !p.getName().startsWith("_") && !p.getName().startsWith(".");
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
new file mode 100644
index 0000000..ec413ac
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.parquet.Closeables;
+import org.apache.parquet.Log;
+
+/**
+ * Serialization utils copied from:
+ * https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/util/HadoopUtils.java
+ *
+ * TODO: Refactor elephant-bird so that we can depend on utils like this without extra baggage.
+ */
+public final class SerializationUtil {
+ private static final Log LOG = Log.getLog(SerializationUtil.class);
+
+ private SerializationUtil() { }
+
+ /**
+ * Reads an object (that was written using
+ * {@link #writeObjectToConfAsBase64}) from a configuration.
+ *
+ * @param key for the configuration
+ * @param conf to read from
+ * @return the read object, or null if key is not present in conf
+ * @throws IOException
+ */
+ public static void writeObjectToConfAsBase64(String key, Object obj, Configuration conf) throws IOException {
+ ByteArrayOutputStream baos = null;
+ GZIPOutputStream gos = null;
+ ObjectOutputStream oos = null;
+
+ try {
+ baos = new ByteArrayOutputStream();
+ gos = new GZIPOutputStream(baos);
+ oos = new ObjectOutputStream(gos);
+ oos.writeObject(obj);
+ } finally {
+ Closeables.close(oos);
+ Closeables.close(gos);
+ Closeables.close(baos);
+ }
+
+ conf.set(key, new String(Base64.encodeBase64(baos.toByteArray()), "UTF-8"));
+ }
+
+ /**
+ * Reads an object (that was written using
+ * {@link #writeObjectToConfAsBase64}) from a configuration
+ *
+ * @param key for the configuration
+ * @param conf to read from
+ * @return the read object, or null if key is not present in conf
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T readObjectFromConfAsBase64(String key, Configuration conf) throws IOException {
+ String b64 = conf.get(key);
+ if (b64 == null) {
+ return null;
+ }
+
+ byte[] bytes = Base64.decodeBase64(b64.getBytes("UTF-8"));
+
+ ByteArrayInputStream bais = null;
+ GZIPInputStream gis = null;
+ ObjectInputStream ois = null;
+
+ try {
+ bais = new ByteArrayInputStream(bytes);
+ gis = new GZIPInputStream(bais);
+ ois = new ObjectInputStream(gis);
+ return (T) ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Could not read object from config with key " + key, e);
+ } catch (ClassCastException e) {
+ throw new IOException("Couldn't cast object read from config with key " + key, e);
+ } finally {
+ Closeables.close(ois);
+ Closeables.close(gis);
+ Closeables.close(bais);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
new file mode 100644
index 0000000..e537783
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.parquet.hadoop.util.counters.mapred.MapRedCounterLoader;
+import org.apache.parquet.hadoop.util.counters.mapreduce.MapReduceCounterLoader;
+
+/**
+ * Encapsulate counter operations, compatible with Hadoop1/2, mapred/mapreduce API
+ *
+ * @author Tianshuo Deng
+ */
+public class BenchmarkCounter {
+
+ private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
+ private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
+ private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
+ private static final String COUNTER_GROUP_NAME = "parquet";
+ private static final String BYTES_READ_COUNTER_NAME = "bytesread";
+ private static final String BYTES_TOTAL_COUNTER_NAME = "bytestotal";
+ private static final String TIME_READ_COUNTER_NAME = "timeread";
+ private static ICounter bytesReadCounter = new NullCounter();
+ private static ICounter totalBytesCounter = new NullCounter();
+ private static ICounter timeCounter = new NullCounter();
+ private static CounterLoader counterLoader;
+
+ /**
+ * Init counters in hadoop's mapreduce API, support both 1.x and 2.x
+ *
+ * @param context
+ */
+ public static void initCounterFromContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+ counterLoader = new MapReduceCounterLoader(context);
+ loadCounters();
+ }
+
+ /**
+ * Init counters in hadoop's mapred API, which is used by cascading and Hive.
+ *
+ * @param reporter
+ * @param configuration
+ */
+ public static void initCounterFromReporter(Reporter reporter, Configuration configuration) {
+ counterLoader = new MapRedCounterLoader(reporter, configuration);
+ loadCounters();
+ }
+
+ private static void loadCounters() {
+ bytesReadCounter = getCounterWhenFlagIsSet(COUNTER_GROUP_NAME, BYTES_READ_COUNTER_NAME, ENABLE_BYTES_READ_COUNTER);
+ totalBytesCounter = getCounterWhenFlagIsSet(COUNTER_GROUP_NAME, BYTES_TOTAL_COUNTER_NAME, ENABLE_BYTES_TOTAL_COUNTER);
+ timeCounter = getCounterWhenFlagIsSet(COUNTER_GROUP_NAME, TIME_READ_COUNTER_NAME, ENABLE_TIME_READ_COUNTER);
+ }
+
+ private static ICounter getCounterWhenFlagIsSet(String groupName, String counterName, String counterFlag) {
+ return counterLoader.getCounterByNameAndFlag(groupName, counterName, counterFlag);
+ }
+
+ public static void incrementTotalBytes(long val) {
+ totalBytesCounter.increment(val);
+ }
+
+ public static long getTotalBytes() {
+ return totalBytesCounter.getCount();
+ }
+
+ public static void incrementBytesRead(long val) {
+ bytesReadCounter.increment(val);
+ }
+
+ public static long getBytesRead() {
+ return bytesReadCounter.getCount();
+ }
+
+ public static void incrementTime(long val) {
+ timeCounter.increment(val);
+ }
+
+ public static long getTime() {
+ return timeCounter.getCount();
+ }
+
+ public static class NullCounter implements ICounter {
+ @Override
+ public void increment(long val) {
+ //do nothing
+ }
+
+ @Override
+ public long getCount() {
+ return 0;
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/CounterLoader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/CounterLoader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/CounterLoader.java
new file mode 100644
index 0000000..0b9f92f
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/CounterLoader.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters;
+
+/**
+ * Factory interface for CounterLoaders, will load the counter according to groupName, counterName,
+ * and if in the configuration, flag with name counterFlag is false, the counter will not be loaded
+ * @author Tianshuo Deng
+ */
+public interface CounterLoader {
+ public ICounter getCounterByNameAndFlag(String groupName, String counterName, String counterFlag);
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/ICounter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/ICounter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/ICounter.java
new file mode 100644
index 0000000..c10b8a8
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/ICounter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters;
+
+/**
+ * Interface for counters in mapred/mapreduce package of hadoop
+ * @author Tianshuo Deng
+ */
+public interface ICounter {
+ public void increment(long val);
+ public long getCount();
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterAdapter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterAdapter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterAdapter.java
new file mode 100644
index 0000000..4377d44
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterAdapter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters.mapred;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.parquet.hadoop.util.counters.ICounter;
+
+/**
+ * Adapt a mapred counter to ICounter
+ * @author Tianshuo Deng
+ */
+public class MapRedCounterAdapter implements ICounter {
+ private org.apache.hadoop.mapred.Counters.Counter adaptee;
+
+ public MapRedCounterAdapter(Counters.Counter adaptee) {
+ this.adaptee = adaptee;
+ }
+
+ @Override
+ public void increment(long val) {
+ adaptee.increment(val);
+ }
+
+ @Override
+ public long getCount() {
+ return adaptee.getCounter();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterLoader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterLoader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterLoader.java
new file mode 100644
index 0000000..0e5a32d
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterLoader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.hadoop.util.counters.CounterLoader;
+import org.apache.parquet.hadoop.util.counters.ICounter;
+
+/**
+ * Concrete factory for counters in mapred API,
+ * get a counter using mapred API when the corresponding flag is set, otherwise return a NullCounter
+ * @author Tianshuo Deng
+ */
+public class MapRedCounterLoader implements CounterLoader {
+ private Reporter reporter;
+ private Configuration conf;
+
+ public MapRedCounterLoader(Reporter reporter, Configuration conf) {
+ this.reporter = reporter;
+ this.conf = conf;
+ }
+
+ @Override
+ public ICounter getCounterByNameAndFlag(String groupName, String counterName, String counterFlag) {
+ if (conf.getBoolean(counterFlag, true)) {
+ Counters.Counter counter = reporter.getCounter(groupName, counterName);
+ if (counter != null) {
+ return new MapRedCounterAdapter(reporter.getCounter(groupName, counterName));
+ }
+ }
+ return new BenchmarkCounter.NullCounter();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterAdapter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterAdapter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterAdapter.java
new file mode 100644
index 0000000..1339977
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterAdapter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters.mapreduce;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.hadoop.util.counters.ICounter;
+
+/**
+ * Adapt a mapreduce counter to ICounter
+ * @author Tianshuo Deng
+ */
+public class MapReduceCounterAdapter implements ICounter {
+ private Counter adaptee;
+
+ public MapReduceCounterAdapter(Counter adaptee) {
+ this.adaptee = adaptee;
+ }
+
+ @Override
+ public void increment(long val) {
+ ContextUtil.incrementCounter(adaptee, val);
+ }
+
+ @Override
+ public long getCount() {
+ return adaptee.getValue(); //To change body of implemented methods use File | Settings | File Templates.
+ }
+}