You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:06 UTC
[09/51] [partial] parquet-mr git commit: PARQUET-23: Rename to
org.apache.parquet.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
deleted file mode 100644
index 144515c..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static java.lang.Math.max;
-import static java.lang.Math.min;
-import static java.lang.String.format;
-import static parquet.Log.DEBUG;
-import static parquet.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import parquet.Ints;
-import parquet.Log;
-import parquet.column.ColumnWriteStore;
-import parquet.column.ParquetProperties;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.impl.ColumnWriteStoreV1;
-import parquet.column.impl.ColumnWriteStoreV2;
-import parquet.hadoop.CodecFactory.BytesCompressor;
-import parquet.hadoop.api.WriteSupport;
-import parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.schema.MessageType;
-
-class InternalParquetRecordWriter<T> {
- private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class);
-
- private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
- private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
-
- private final ParquetFileWriter parquetFileWriter;
- private final WriteSupport<T> writeSupport;
- private final MessageType schema;
- private final Map<String, String> extraMetaData;
- private final long rowGroupSize;
- private long rowGroupSizeThreshold;
- private final int pageSize;
- private final BytesCompressor compressor;
- private final boolean validating;
- private final ParquetProperties parquetProperties;
-
- private long recordCount = 0;
- private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
-
- private ColumnWriteStore columnStore;
- private ColumnChunkPageWriteStore pageStore;
-
-
- /**
- * @param parquetFileWriter the file to write to
- * @param writeSupport the class to convert incoming records
- * @param schema the schema of the records
- * @param extraMetaData extra meta data to write in the footer of the file
- * @param rowGroupSize the size of a block in the file (this will be approximate)
- * @param compressor the codec used to compress
- */
- public InternalParquetRecordWriter(
- ParquetFileWriter parquetFileWriter,
- WriteSupport<T> writeSupport,
- MessageType schema,
- Map<String, String> extraMetaData,
- long rowGroupSize,
- int pageSize,
- BytesCompressor compressor,
- int dictionaryPageSize,
- boolean enableDictionary,
- boolean validating,
- WriterVersion writerVersion) {
- this.parquetFileWriter = parquetFileWriter;
- this.writeSupport = checkNotNull(writeSupport, "writeSupport");
- this.schema = schema;
- this.extraMetaData = extraMetaData;
- this.rowGroupSize = rowGroupSize;
- this.rowGroupSizeThreshold = rowGroupSize;
- this.pageSize = pageSize;
- this.compressor = compressor;
- this.validating = validating;
- this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary);
- initStore();
- }
-
- private void initStore() {
- pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize);
- columnStore = parquetProperties.newColumnWriteStore(
- schema,
- pageStore,
- pageSize);
- MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
- writeSupport.prepareForWrite(columnIO.getRecordWriter(columnStore));
- }
-
- public void close() throws IOException, InterruptedException {
- flushRowGroupToStore();
- FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite();
- Map<String, String> finalMetadata = new HashMap<String, String>(extraMetaData);
- finalMetadata.putAll(finalWriteContext.getExtraMetaData());
- parquetFileWriter.end(finalMetadata);
- }
-
- public void write(T value) throws IOException, InterruptedException {
- writeSupport.write(value);
- ++ recordCount;
- checkBlockSizeReached();
- }
-
- private void checkBlockSizeReached() throws IOException {
- if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
- long memSize = columnStore.getBufferedSize();
- if (memSize > rowGroupSizeThreshold) {
- LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, rowGroupSizeThreshold, recordCount));
- flushRowGroupToStore();
- initStore();
- recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
- } else {
- float recordSize = (float) memSize / recordCount;
- recordCountForNextMemCheck = min(
- max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(rowGroupSizeThreshold / recordSize)) / 2), // will check halfway
- recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
- );
- if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
- }
- }
- }
-
- private void flushRowGroupToStore()
- throws IOException {
- LOG.info(format("Flushing mem columnStore to file. allocated memory: %,d", columnStore.getAllocatedSize()));
- if (columnStore.getAllocatedSize() > 3 * (long)rowGroupSizeThreshold) {
- LOG.warn("Too much memory used: " + columnStore.memUsageString());
- }
-
- if (recordCount > 0) {
- parquetFileWriter.startBlock(recordCount);
- columnStore.flush();
- pageStore.flushToFileWriter(parquetFileWriter);
- recordCount = 0;
- parquetFileWriter.endBlock();
- }
-
- columnStore = null;
- pageStore = null;
- }
-
- long getRowGroupSizeThreshold() {
- return rowGroupSizeThreshold;
- }
-
- void setRowGroupSizeThreshold(long rowGroupSizeThreshold) {
- this.rowGroupSizeThreshold = rowGroupSizeThreshold;
- }
-
- MessageType getSchema() {
- return this.schema;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java b/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java
deleted file mode 100644
index 2d6070f..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/LruCache.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import parquet.Log;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * A basic implementation of an LRU cache. Besides evicting the least recently
- * used entries (either based on insertion or access order), this class also
- * checks for "stale" entries as entries are inserted or retrieved (note
- * "staleness" is defined by the entries themselves (see
- * {@link parquet.hadoop.LruCache.Value}).
- *
- * @param <K> The key type. Acts as the key in a {@link java.util.LinkedHashMap}
- * @param <V> The value type. Must extend {@link parquet.hadoop.LruCache.Value}
- * so that the "staleness" of the value can be easily determined.
- */
-final class LruCache<K, V extends LruCache.Value<K, V>> {
- private static final Log LOG = Log.getLog(LruCache.class);
-
- private static final float DEFAULT_LOAD_FACTOR = 0.75f;
-
- private final LinkedHashMap<K, V> cacheMap;
-
- /**
- * Constructs an access-order based LRU cache with {@code maxSize} entries.
- * @param maxSize The maximum number of entries to store in the cache.
- */
- public LruCache(final int maxSize) {
- this(maxSize, DEFAULT_LOAD_FACTOR, true);
- }
-
- /**
- * Constructs an LRU cache.
- *
- * @param maxSize The maximum number of entries to store in the cache.
- * @param loadFactor Used to determine the initial capacity.
- * @param accessOrder the ordering mode - {@code true} for access-order,
- * {@code false} for insertion-order
- */
- public LruCache(final int maxSize, final float loadFactor, final boolean accessOrder) {
- int initialCapacity = Math.round(maxSize / loadFactor);
- cacheMap =
- new LinkedHashMap<K, V>(initialCapacity, loadFactor, accessOrder) {
- @Override
- public boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
- boolean result = size() > maxSize;
- if (result) {
- if (Log.DEBUG) {
- LOG.debug("Removing eldest entry in cache: "
- + eldest.getKey());
- }
- }
- return result;
- }
- };
- }
-
- /**
- * Removes the mapping for the specified key from this cache if present.
- * @param key key whose mapping is to be removed from the cache
- * @return the previous value associated with key, or null if there was no
- * mapping for key.
- */
- public V remove(final K key) {
- V oldValue = cacheMap.remove(key);
- if (oldValue != null) {
- if (Log.DEBUG) {
- LOG.debug("Removed cache entry for '" + key + "'");
- }
- }
- return oldValue;
- }
-
- /**
- * Associates the specified value with the specified key in this cache. The
- * value is only inserted if it is not null and it is considered current. If
- * the cache previously contained a mapping for the key, the old value is
- * replaced only if the new value is "newer" than the old one.
- * @param key key with which the specified value is to be associated
- * @param newValue value to be associated with the specified key
- */
- public void put(final K key, final V newValue) {
- if (newValue == null || !newValue.isCurrent(key)) {
- if (Log.WARN) {
- LOG.warn("Ignoring new cache entry for '" + key + "' because it is "
- + (newValue == null ? "null" : "not current"));
- }
- return;
- }
-
- V oldValue = cacheMap.get(key);
- if (oldValue != null && oldValue.isNewerThan(newValue)) {
- if (Log.WARN) {
- LOG.warn("Ignoring new cache entry for '" + key + "' because "
- + "existing cache entry is newer");
- }
- return;
- }
-
- // no existing value or new value is newer than old value
- oldValue = cacheMap.put(key, newValue);
- if (Log.DEBUG) {
- if (oldValue == null) {
- LOG.debug("Added new cache entry for '" + key + "'");
- } else {
- LOG.debug("Overwrote existing cache entry for '" + key + "'");
- }
- }
- }
-
- /**
- * Removes all of the mappings from this cache. The cache will be empty
- * after this call returns.
- */
- public void clear() {
- cacheMap.clear();
- }
-
- /**
- * Returns the value to which the specified key is mapped, or null if 1) the
- * value is not current or 2) this cache contains no mapping for the key.
- * @param key the key whose associated value is to be returned
- * @return the value to which the specified key is mapped, or null if 1) the
- * value is not current or 2) this cache contains no mapping for the key
- */
- public V getCurrentValue(final K key) {
- V value = cacheMap.get(key);
- if (Log.DEBUG) {
- LOG.debug("Value for '" + key + "' " + (value == null ? "not " : "")
- + "in cache");
- }
- if (value != null && !value.isCurrent(key)) {
- // value is not current; remove it and return null
- remove(key);
- return null;
- }
-
- return value;
- }
-
- /**
- * Returns the number of key-value mappings in this cache.
- * @return the number of key-value mappings in this cache.
- */
- public int size() {
- return cacheMap.size();
- }
-
- /**
- * {@link parquet.hadoop.LruCache} expects all values to follow this
- * interface so the cache can determine 1) whether values are current (e.g.
- * the referenced data has not been modified/updated in such a way that the
- * value is no longer useful) and 2) whether a value is strictly "newer"
- * than another value.
- *
- * @param <K> The key type.
- * @param <V> Provides a bound for the {@link #isNewerThan(V)} method
- */
- interface Value<K, V> {
- /**
- * Is the value still current (e.g. has the referenced data been
- * modified/updated in such a way that the value is no longer useful)
- * @param key the key associated with this value
- * @return {@code true} the value is still current, {@code false} the value
- * is no longer useful
- */
- boolean isCurrent(K key);
-
- /**
- * Compares this value with the specified value to check for relative age.
- * @param otherValue the value to be compared.
- * @return {@code true} the value is strictly newer than the other value,
- * {@code false} the value is older or just
- * as new as the other value.
- */
- boolean isNewerThan(V otherValue);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java b/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
deleted file mode 100644
index 9724868..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/MemoryManager.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import parquet.Log;
-import parquet.ParquetRuntimeException;
-
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Implements a memory manager that keeps a global context of how many Parquet
- * writers there are and manages the memory between them. For use cases with
- * dynamic partitions, it is easy to end up with many writers in the same task.
- * By managing the size of each allocation, we try to cut down the size of each
- * allocation and keep the task from running out of memory.
- *
- * This class balances the allocation size of each writer by resize them averagely.
- * When the sum of each writer's allocation size is less than total memory pool,
- * keep them original value.
- * When the sum exceeds, decrease each writer's allocation size by a ratio.
- */
-public class MemoryManager {
- private static final Log LOG = Log.getLog(MemoryManager.class);
- static final float DEFAULT_MEMORY_POOL_RATIO = 0.95f;
- static final long DEFAULT_MIN_MEMORY_ALLOCATION = 1 * 1024 * 1024; // 1MB
- private final float memoryPoolRatio;
-
- private final long totalMemoryPool;
- private final long minMemoryAllocation;
- private final Map<InternalParquetRecordWriter, Long> writerList = new
- HashMap<InternalParquetRecordWriter, Long>();
-
- public MemoryManager(float ratio, long minAllocation) {
- checkRatio(ratio);
-
- memoryPoolRatio = ratio;
- minMemoryAllocation = minAllocation;
- totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax
- () * ratio);
- LOG.debug(String.format("Allocated total memory pool is: %,d", totalMemoryPool));
- }
-
- private void checkRatio(float ratio) {
- if (ratio <= 0 || ratio > 1) {
- throw new IllegalArgumentException("The configured memory pool ratio " + ratio + " is " +
- "not between 0 and 1.");
- }
- }
-
- /**
- * Add a new writer and its memory allocation to the memory manager.
- * @param writer the new created writer
- * @param allocation the requested buffer size
- */
- synchronized void addWriter(InternalParquetRecordWriter writer, Long allocation) {
- Long oldValue = writerList.get(writer);
- if (oldValue == null) {
- writerList.put(writer, allocation);
- } else {
- throw new IllegalArgumentException("[BUG] The Parquet Memory Manager should not add an " +
- "instance of InternalParquetRecordWriter more than once. The Manager already contains " +
- "the writer: " + writer);
- }
- updateAllocation();
- }
-
- /**
- * Remove the given writer from the memory manager.
- * @param writer the writer that has been closed
- */
- synchronized void removeWriter(InternalParquetRecordWriter writer) {
- if (writerList.containsKey(writer)) {
- writerList.remove(writer);
- }
- if (!writerList.isEmpty()) {
- updateAllocation();
- }
- }
-
- /**
- * Update the allocated size of each writer based on the current allocations and pool size.
- */
- private void updateAllocation() {
- long totalAllocations = 0;
- double scale;
- for (Long allocation : writerList.values()) {
- totalAllocations += allocation;
- }
- if (totalAllocations <= totalMemoryPool) {
- scale = 1.0;
- } else {
- scale = (double) totalMemoryPool / totalAllocations;
- LOG.warn(String.format(
- "Total allocation exceeds %.2f%% (%,d bytes) of heap memory\n" +
- "Scaling row group sizes to %.2f%% for %d writers",
- 100*memoryPoolRatio, totalMemoryPool, 100*scale, writerList.size()));
- }
-
- int maxColCount = 0;
- for (InternalParquetRecordWriter w : writerList.keySet()) {
- maxColCount = Math.max(w.getSchema().getColumns().size(), maxColCount);
- }
-
- for (Map.Entry<InternalParquetRecordWriter, Long> entry : writerList.entrySet()) {
- long newSize = (long) Math.floor(entry.getValue() * scale);
- if(scale < 1.0 && minMemoryAllocation > 0 && newSize < minMemoryAllocation) {
- throw new ParquetRuntimeException(String.format("New Memory allocation %d bytes" +
- " is smaller than the minimum allocation size of %d bytes.",
- newSize, minMemoryAllocation)){};
- }
- entry.getKey().setRowGroupSizeThreshold(newSize);
- LOG.debug(String.format("Adjust block size from %,d to %,d for writer: %s",
- entry.getValue(), newSize, entry.getKey()));
- }
- }
-
- /**
- * Get the total memory pool size that is available for writers.
- * @return the number of bytes in the memory pool
- */
- long getTotalMemoryPool() {
- return totalMemoryPool;
- }
-
- /**
- * Get the writers list
- * @return the writers in this memory manager
- */
- Map<InternalParquetRecordWriter, Long> getWriterList() {
- return writerList;
- }
-
- /**
- * Get the ratio of memory allocated for all the writers.
- * @return the memory pool ratio
- */
- float getMemoryPoolRatio() {
- return memoryPoolRatio;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
deleted file mode 100644
index 5b0b341..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
+++ /dev/null
@@ -1,782 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static parquet.Log.DEBUG;
-import static parquet.bytes.BytesUtils.readIntLittleEndian;
-import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-import static parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
-import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
-import static parquet.hadoop.ParquetFileWriter.MAGIC;
-import static parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
-import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
-
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.SequenceInputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.DataPage;
-import parquet.column.page.DataPageV1;
-import parquet.column.page.DataPageV2;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageReadStore;
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.format.DataPageHeader;
-import parquet.format.DataPageHeaderV2;
-import parquet.format.DictionaryPageHeader;
-import parquet.format.PageHeader;
-import parquet.format.Util;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
-import parquet.hadoop.CodecFactory.BytesDecompressor;
-import parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.hadoop.util.HiddenFileFilter;
-import parquet.hadoop.util.counters.BenchmarkCounter;
-import parquet.io.ParquetDecodingException;
-
-/**
- * Internal implementation of the Parquet file reader as a block container
- *
- * @author Julien Le Dem
- *
- */
-public class ParquetFileReader implements Closeable {
-
- private static final Log LOG = Log.getLog(ParquetFileReader.class);
-
- public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";
-
- private static ParquetMetadataConverter converter = new ParquetMetadataConverter();
-
- /**
- * for files provided, check if there's a summary file.
- * If a summary file is found it is used otherwise the file footer is used.
- * @param configuration the hadoop conf to connect to the file system;
- * @param partFiles the part files to read
- * @return the footers for those files using the summary file if possible.
- * @throws IOException
- */
- @Deprecated
- public static List<Footer> readAllFootersInParallelUsingSummaryFiles(Configuration configuration, List<FileStatus> partFiles) throws IOException {
- return readAllFootersInParallelUsingSummaryFiles(configuration, partFiles, false);
- }
-
- private static MetadataFilter filter(boolean skipRowGroups) {
- return skipRowGroups ? SKIP_ROW_GROUPS : NO_FILTER;
- }
-
- /**
- * for files provided, check if there's a summary file.
- * If a summary file is found it is used otherwise the file footer is used.
- * @param configuration the hadoop conf to connect to the file system;
- * @param partFiles the part files to read
- * @param skipRowGroups to skipRowGroups in the footers
- * @return the footers for those files using the summary file if possible.
- * @throws IOException
- */
- public static List<Footer> readAllFootersInParallelUsingSummaryFiles(
- final Configuration configuration,
- final Collection<FileStatus> partFiles,
- final boolean skipRowGroups) throws IOException {
-
- // figure out list of all parents to part files
- Set<Path> parents = new HashSet<Path>();
- for (FileStatus part : partFiles) {
- parents.add(part.getPath().getParent());
- }
-
- // read corresponding summary files if they exist
- List<Callable<Map<Path, Footer>>> summaries = new ArrayList<Callable<Map<Path, Footer>>>();
- for (final Path path : parents) {
- summaries.add(new Callable<Map<Path, Footer>>() {
- @Override
- public Map<Path, Footer> call() throws Exception {
- ParquetMetadata mergedMetadata = readSummaryMetadata(configuration, path, skipRowGroups);
- if (mergedMetadata != null) {
- final List<Footer> footers;
- if (skipRowGroups) {
- footers = new ArrayList<Footer>();
- for (FileStatus f : partFiles) {
- footers.add(new Footer(f.getPath(), mergedMetadata));
- }
- } else {
- footers = footersFromSummaryFile(path, mergedMetadata);
- }
- Map<Path, Footer> map = new HashMap<Path, Footer>();
- for (Footer footer : footers) {
- // the folder may have been moved
- footer = new Footer(new Path(path, footer.getFile().getName()), footer.getParquetMetadata());
- map.put(footer.getFile(), footer);
- }
- return map;
- } else {
- return Collections.emptyMap();
- }
- }
- });
- }
-
- Map<Path, Footer> cache = new HashMap<Path, Footer>();
- try {
- List<Map<Path, Footer>> footersFromSummaries = runAllInParallel(configuration.getInt(PARQUET_READ_PARALLELISM, 5), summaries);
- for (Map<Path, Footer> footers : footersFromSummaries) {
- cache.putAll(footers);
- }
- } catch (ExecutionException e) {
- throw new IOException("Error reading summaries", e);
- }
-
- // keep only footers for files actually requested and read file footer if not found in summaries
- List<Footer> result = new ArrayList<Footer>(partFiles.size());
- List<FileStatus> toRead = new ArrayList<FileStatus>();
- for (FileStatus part : partFiles) {
- Footer f = cache.get(part.getPath());
- if (f != null) {
- result.add(f);
- } else {
- toRead.add(part);
- }
- }
-
- if (toRead.size() > 0) {
- // read the footers of the files that did not have a summary file
- if (Log.INFO) LOG.info("reading another " + toRead.size() + " footers");
- result.addAll(readAllFootersInParallel(configuration, toRead, skipRowGroups));
- }
-
- return result;
- }
-
- private static <T> List<T> runAllInParallel(int parallelism, List<Callable<T>> toRun) throws ExecutionException {
- LOG.info("Initiating action with parallelism: " + parallelism);
- ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
- try {
- List<Future<T>> futures = new ArrayList<Future<T>>();
- for (Callable<T> callable : toRun) {
- futures.add(threadPool.submit(callable));
- }
- List<T> result = new ArrayList<T>(toRun.size());
- for (Future<T> future : futures) {
- try {
- result.add(future.get());
- } catch (InterruptedException e) {
- throw new RuntimeException("The thread was interrupted", e);
- }
- }
- return result;
- } finally {
- threadPool.shutdownNow();
- }
- }
-
- @Deprecated
- public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles) throws IOException {
- return readAllFootersInParallel(configuration, partFiles, false);
- }
-
- /**
- * read all the footers of the files provided
- * (not using summary files)
- * @param configuration the conf to access the File System
- * @param partFiles the files to read
- * @param skipRowGroups to skip the rowGroup info
- * @return the footers
- * @throws IOException
- */
- public static List<Footer> readAllFootersInParallel(final Configuration configuration, List<FileStatus> partFiles, final boolean skipRowGroups) throws IOException {
- List<Callable<Footer>> footers = new ArrayList<Callable<Footer>>();
- for (final FileStatus currentFile : partFiles) {
- footers.add(new Callable<Footer>() {
- @Override
- public Footer call() throws Exception {
- try {
- return new Footer(currentFile.getPath(), readFooter(configuration, currentFile, filter(skipRowGroups)));
- } catch (IOException e) {
- throw new IOException("Could not read footer for file " + currentFile, e);
- }
- }
- });
- }
- try {
- return runAllInParallel(configuration.getInt(PARQUET_READ_PARALLELISM, 5), footers);
- } catch (ExecutionException e) {
- throw new IOException("Could not read footer: " + e.getMessage(), e.getCause());
- }
- }
-
- /**
- * Read the footers of all the files under that path (recursively)
- * not using summary files.
- * rowGroups are not skipped
- * @param configuration the configuration to access the FS
- * @param fileStatus the root dir
- * @return all the footers
- * @throws IOException
- */
- public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus) throws IOException {
- List<FileStatus> statuses = listFiles(configuration, fileStatus);
- return readAllFootersInParallel(configuration, statuses, false);
- }
-
- @Deprecated
- public static List<Footer> readFooters(Configuration configuration, Path path) throws IOException {
- return readFooters(configuration, status(configuration, path));
- }
-
- private static FileStatus status(Configuration configuration, Path path) throws IOException {
- return path.getFileSystem(configuration).getFileStatus(path);
- }
-
- /**
- * this always returns the row groups
- * @param configuration
- * @param pathStatus
- * @return
- * @throws IOException
- */
- @Deprecated
- public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus) throws IOException {
- return readFooters(configuration, pathStatus, false);
- }
-
- /**
- * Read the footers of all the files under that path (recursively)
- * using summary files if possible
- * @param configuration the configuration to access the FS
- * @param fileStatus the root dir
- * @return all the footers
- * @throws IOException
- */
- public static List<Footer> readFooters(Configuration configuration, FileStatus pathStatus, boolean skipRowGroups) throws IOException {
- List<FileStatus> files = listFiles(configuration, pathStatus);
- return readAllFootersInParallelUsingSummaryFiles(configuration, files, skipRowGroups);
- }
-
- private static List<FileStatus> listFiles(Configuration conf, FileStatus fileStatus) throws IOException {
- if (fileStatus.isDir()) {
- FileSystem fs = fileStatus.getPath().getFileSystem(conf);
- FileStatus[] list = fs.listStatus(fileStatus.getPath(), HiddenFileFilter.INSTANCE);
- List<FileStatus> result = new ArrayList<FileStatus>();
- for (FileStatus sub : list) {
- result.addAll(listFiles(conf, sub));
- }
- return result;
- } else {
- return Arrays.asList(fileStatus);
- }
- }
-
- /**
- * Specifically reads a given summary file
- * @param configuration
- * @param summaryStatus
- * @return the metadata translated for each file
- * @throws IOException
- */
- public static List<Footer> readSummaryFile(Configuration configuration, FileStatus summaryStatus) throws IOException {
- final Path parent = summaryStatus.getPath().getParent();
- ParquetMetadata mergedFooters = readFooter(configuration, summaryStatus, filter(false));
- return footersFromSummaryFile(parent, mergedFooters);
- }
-
- static ParquetMetadata readSummaryMetadata(Configuration configuration, Path basePath, boolean skipRowGroups) throws IOException {
- Path metadataFile = new Path(basePath, PARQUET_METADATA_FILE);
- Path commonMetaDataFile = new Path(basePath, PARQUET_COMMON_METADATA_FILE);
- FileSystem fileSystem = basePath.getFileSystem(configuration);
- if (skipRowGroups && fileSystem.exists(commonMetaDataFile)) {
- // reading the summary file that does not contain the row groups
- if (Log.INFO) LOG.info("reading summary file: " + commonMetaDataFile);
- return readFooter(configuration, commonMetaDataFile, filter(skipRowGroups));
- } else if (fileSystem.exists(metadataFile)) {
- if (Log.INFO) LOG.info("reading summary file: " + metadataFile);
- return readFooter(configuration, metadataFile, filter(skipRowGroups));
- } else {
- return null;
- }
- }
-
- static List<Footer> footersFromSummaryFile(final Path parent, ParquetMetadata mergedFooters) {
- Map<Path, ParquetMetadata> footers = new HashMap<Path, ParquetMetadata>();
- List<BlockMetaData> blocks = mergedFooters.getBlocks();
- for (BlockMetaData block : blocks) {
- String path = block.getPath();
- Path fullPath = new Path(parent, path);
- ParquetMetadata current = footers.get(fullPath);
- if (current == null) {
- current = new ParquetMetadata(mergedFooters.getFileMetaData(), new ArrayList<BlockMetaData>());
- footers.put(fullPath, current);
- }
- current.getBlocks().add(block);
- }
- List<Footer> result = new ArrayList<Footer>();
- for (Entry<Path, ParquetMetadata> entry : footers.entrySet()) {
- result.add(new Footer(entry.getKey(), entry.getValue()));
- }
- return result;
- }
-
- /**
- * Reads the meta data block in the footer of the file
- * @param configuration
- * @param file the parquet File
- * @return the metadata blocks in the footer
- * @throws IOException if an error occurs while reading the file
- */
- @Deprecated
- public static final ParquetMetadata readFooter(Configuration configuration, Path file) throws IOException {
- return readFooter(configuration, file, NO_FILTER);
- }
-
- /**
- * Reads the meta data in the footer of the file.
- * Skipping row groups (or not) based on the provided filter
- * @param configuration
- * @param file the Parquet File
- * @param filter the filter to apply to row groups
- * @return the metadata with row groups filtered.
- * @throws IOException if an error occurs while reading the file
- */
- public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
- FileSystem fileSystem = file.getFileSystem(configuration);
- return readFooter(configuration, fileSystem.getFileStatus(file), filter);
- }
-
- /**
- * @deprecated use {@link ParquetFileReader#readFooter(Configuration, FileStatus, MetadataFilter)}
- */
- @Deprecated
- public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file) throws IOException {
- return readFooter(configuration, file, NO_FILTER);
- }
-
- /**
- * Reads the meta data block in the footer of the file
- * @param configuration
- * @param file the parquet File
- * @param filter the filter to apply to row groups
- * @return the metadata blocks in the footer
- * @throws IOException if an error occurs while reading the file
- */
- public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
- FileSystem fileSystem = file.getPath().getFileSystem(configuration);
- FSDataInputStream f = fileSystem.open(file.getPath());
- try {
- long l = file.getLen();
- if (Log.DEBUG) LOG.debug("File length " + l);
- int FOOTER_LENGTH_SIZE = 4;
- if (l < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
- throw new RuntimeException(file.getPath() + " is not a Parquet file (too small)");
- }
- long footerLengthIndex = l - FOOTER_LENGTH_SIZE - MAGIC.length;
- if (Log.DEBUG) LOG.debug("reading footer index at " + footerLengthIndex);
-
- f.seek(footerLengthIndex);
- int footerLength = readIntLittleEndian(f);
- byte[] magic = new byte[MAGIC.length];
- f.readFully(magic);
- if (!Arrays.equals(MAGIC, magic)) {
- throw new RuntimeException(file.getPath() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
- }
- long footerIndex = footerLengthIndex - footerLength;
- if (Log.DEBUG) LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex);
- if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
- throw new RuntimeException("corrupted file: the footer index is not within the file");
- }
- f.seek(footerIndex);
- return converter.readParquetMetadata(f, filter);
- } finally {
- f.close();
- }
- }
-
- private final CodecFactory codecFactory;
- private final List<BlockMetaData> blocks;
- private final FSDataInputStream f;
- private final Path filePath;
- private int currentBlock = 0;
- private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>();
-
- /**
- * @param f the Parquet file (will be opened for read in this constructor)
- * @param blocks the blocks to read
- * @param colums the columns to read (their path)
- * @param codecClassName the codec used to compress the blocks
- * @throws IOException if the file can not be opened
- */
- public ParquetFileReader(Configuration configuration, Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
- this.filePath = filePath;
- FileSystem fs = filePath.getFileSystem(configuration);
- this.f = fs.open(filePath);
- this.blocks = blocks;
- for (ColumnDescriptor col : columns) {
- paths.put(ColumnPath.get(col.getPath()), col);
- }
- this.codecFactory = new CodecFactory(configuration);
- }
-
- /**
- * Reads all the columns requested from the row group at the current file position.
- * @throws IOException if an error occurs while reading
- * @return the PageReadStore which can provide PageReaders for each column.
- */
- public PageReadStore readNextRowGroup() throws IOException {
- if (currentBlock == blocks.size()) {
- return null;
- }
- BlockMetaData block = blocks.get(currentBlock);
- if (block.getRowCount() == 0) {
- throw new RuntimeException("Illegal row group of 0 rows");
- }
- ColumnChunkPageReadStore columnChunkPageReadStore = new ColumnChunkPageReadStore(block.getRowCount());
- // prepare the list of consecutive chunks to read them in one scan
- List<ConsecutiveChunkList> allChunks = new ArrayList<ConsecutiveChunkList>();
- ConsecutiveChunkList currentChunks = null;
- for (ColumnChunkMetaData mc : block.getColumns()) {
- ColumnPath pathKey = mc.getPath();
- BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
- ColumnDescriptor columnDescriptor = paths.get(pathKey);
- if (columnDescriptor != null) {
- long startingPos = mc.getStartingPos();
- // first chunk or not consecutive => new list
- if (currentChunks == null || currentChunks.endPos() != startingPos) {
- currentChunks = new ConsecutiveChunkList(startingPos);
- allChunks.add(currentChunks);
- }
- currentChunks.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
- }
- }
- // actually read all the chunks
- for (ConsecutiveChunkList consecutiveChunks : allChunks) {
- final List<Chunk> chunks = consecutiveChunks.readAll(f);
- for (Chunk chunk : chunks) {
- columnChunkPageReadStore.addColumn(chunk.descriptor.col, chunk.readAllPages());
- }
- }
- ++currentBlock;
- return columnChunkPageReadStore;
- }
-
-
-
- @Override
- public void close() throws IOException {
- f.close();
- this.codecFactory.release();
- }
-
- /**
- * The data for a column chunk
- *
- * @author Julien Le Dem
- *
- */
- private class Chunk extends ByteArrayInputStream {
-
- private final ChunkDescriptor descriptor;
-
- /**
- *
- * @param descriptor descriptor for the chunk
- * @param data contains the chunk data at offset
- * @param offset where the chunk starts in offset
- */
- public Chunk(ChunkDescriptor descriptor, byte[] data, int offset) {
- super(data);
- this.descriptor = descriptor;
- this.pos = offset;
- }
-
- protected PageHeader readPageHeader() throws IOException {
- return Util.readPageHeader(this);
- }
-
- /**
- * Read all of the pages in a given column chunk.
- * @return the list of pages
- */
- public ColumnChunkPageReader readAllPages() throws IOException {
- List<DataPage> pagesInChunk = new ArrayList<DataPage>();
- DictionaryPage dictionaryPage = null;
- long valuesCountReadSoFar = 0;
- while (valuesCountReadSoFar < descriptor.metadata.getValueCount()) {
- PageHeader pageHeader = readPageHeader();
- int uncompressedPageSize = pageHeader.getUncompressed_page_size();
- int compressedPageSize = pageHeader.getCompressed_page_size();
- switch (pageHeader.type) {
- case DICTIONARY_PAGE:
- // there is only one dictionary page per column chunk
- if (dictionaryPage != null) {
- throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
- }
- DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
- dictionaryPage =
- new DictionaryPage(
- this.readAsBytesInput(compressedPageSize),
- uncompressedPageSize,
- dicHeader.getNum_values(),
- converter.getEncoding(dicHeader.getEncoding())
- );
- break;
- case DATA_PAGE:
- DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
- pagesInChunk.add(
- new DataPageV1(
- this.readAsBytesInput(compressedPageSize),
- dataHeaderV1.getNum_values(),
- uncompressedPageSize,
- fromParquetStatistics(dataHeaderV1.getStatistics(), descriptor.col.getType()),
- converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
- converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
- converter.getEncoding(dataHeaderV1.getEncoding())
- ));
- valuesCountReadSoFar += dataHeaderV1.getNum_values();
- break;
- case DATA_PAGE_V2:
- DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
- int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
- pagesInChunk.add(
- new DataPageV2(
- dataHeaderV2.getNum_rows(),
- dataHeaderV2.getNum_nulls(),
- dataHeaderV2.getNum_values(),
- this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
- this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
- converter.getEncoding(dataHeaderV2.getEncoding()),
- this.readAsBytesInput(dataSize),
- uncompressedPageSize,
- fromParquetStatistics(dataHeaderV2.getStatistics(), descriptor.col.getType()),
- dataHeaderV2.isIs_compressed()
- ));
- valuesCountReadSoFar += dataHeaderV2.getNum_values();
- break;
- default:
- if (DEBUG) LOG.debug("skipping page of type " + pageHeader.getType() + " of size " + compressedPageSize);
- this.skip(compressedPageSize);
- break;
- }
- }
- if (valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
- // Would be nice to have a CorruptParquetFileException or something as a subclass?
- throw new IOException(
- "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
- filePath + " offset " + descriptor.metadata.getFirstDataPageOffset() +
- " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
- + " pages ending at file offset " + (descriptor.fileOffset + pos()));
- }
- BytesDecompressor decompressor = codecFactory.getDecompressor(descriptor.metadata.getCodec());
- return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage);
- }
-
- /**
- * @return the current position in the chunk
- */
- public int pos() {
- return this.pos;
- }
-
- /**
- * @param size the size of the page
- * @return the page
- * @throws IOException
- */
- public BytesInput readAsBytesInput(int size) throws IOException {
- final BytesInput r = BytesInput.from(this.buf, this.pos, size);
- this.pos += size;
- return r;
- }
-
- }
-
- /**
- * deals with a now fixed bug where compressedLength was missing a few bytes.
- *
- * @author Julien Le Dem
- *
- */
- private class WorkaroundChunk extends Chunk {
-
- private final FSDataInputStream f;
-
- /**
- * @param descriptor the descriptor of the chunk
- * @param data contains the data of the chunk at offset
- * @param offset where the chunk starts in data
- * @param f the file stream positioned at the end of this chunk
- */
- private WorkaroundChunk(ChunkDescriptor descriptor, byte[] data, int offset, FSDataInputStream f) {
- super(descriptor, data, offset);
- this.f = f;
- }
-
- protected PageHeader readPageHeader() throws IOException {
- PageHeader pageHeader;
- int initialPos = this.pos;
- try {
- pageHeader = Util.readPageHeader(this);
- } catch (IOException e) {
- // this is to workaround a bug where the compressedLength
- // of the chunk is missing the size of the header of the dictionary
- // to allow reading older files (using dictionary) we need this.
- // usually 13 to 19 bytes are missing
- // if the last page is smaller than this, the page header itself is truncated in the buffer.
- this.pos = initialPos; // resetting the buffer to the position before we got the error
- LOG.info("completing the column chunk to read the page header");
- pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream.
- }
- return pageHeader;
- }
-
- public BytesInput readAsBytesInput(int size) throws IOException {
- if (pos + size > count) {
- // this is to workaround a bug where the compressedLength
- // of the chunk is missing the size of the header of the dictionary
- // to allow reading older files (using dictionary) we need this.
- // usually 13 to 19 bytes are missing
- int l1 = count - pos;
- int l2 = size - l1;
- LOG.info("completed the column chunk with " + l2 + " bytes");
- return BytesInput.concat(super.readAsBytesInput(l1), BytesInput.copy(BytesInput.from(f, l2)));
- }
- return super.readAsBytesInput(size);
- }
-
- }
-
-
- /**
- * information needed to read a column chunk
- */
- private static class ChunkDescriptor {
-
- private final ColumnDescriptor col;
- private final ColumnChunkMetaData metadata;
- private final long fileOffset;
- private final int size;
-
- /**
- * @param col column this chunk is part of
- * @param metadata metadata for the column
- * @param fileOffset offset in the file where this chunk starts
- * @param size size of the chunk
- */
- private ChunkDescriptor(
- ColumnDescriptor col,
- ColumnChunkMetaData metadata,
- long fileOffset,
- int size) {
- super();
- this.col = col;
- this.metadata = metadata;
- this.fileOffset = fileOffset;
- this.size = size;
- }
- }
-
- /**
- * describes a list of consecutive column chunks to be read at once.
- *
- * @author Julien Le Dem
- */
- private class ConsecutiveChunkList {
-
- private final long offset;
- private int length;
- private final List<ChunkDescriptor> chunks = new ArrayList<ChunkDescriptor>();
-
- /**
- * @param offset where the first chunk starts
- */
- ConsecutiveChunkList(long offset) {
- this.offset = offset;
- }
-
- /**
- * adds a chunk to the list.
- * It must be consecutive to the previous chunk
- * @param descriptor
- */
- public void addChunk(ChunkDescriptor descriptor) {
- chunks.add(descriptor);
- length += descriptor.size;
- }
-
- /**
- * @param f file to read the chunks from
- * @return the chunks
- * @throws IOException
- */
- public List<Chunk> readAll(FSDataInputStream f) throws IOException {
- List<Chunk> result = new ArrayList<Chunk>(chunks.size());
- f.seek(offset);
- byte[] chunksBytes = new byte[length];
- f.readFully(chunksBytes);
- // report in a counter the data we just scanned
- BenchmarkCounter.incrementBytesRead(length);
- int currentChunkOffset = 0;
- for (int i = 0; i < chunks.size(); i++) {
- ChunkDescriptor descriptor = chunks.get(i);
- if (i < chunks.size() - 1) {
- result.add(new Chunk(descriptor, chunksBytes, currentChunkOffset));
- } else {
- // because of a bug, the last chunk might be larger than descriptor.size
- result.add(new WorkaroundChunk(descriptor, chunksBytes, currentChunkOffset, f));
- }
- currentChunkOffset += descriptor.size;
- }
- return result ;
- }
-
- /**
- * @return the position following the last byte of these chunks
- */
- public long endPos() {
- return offset + length;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
deleted file mode 100644
index 6868717..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
+++ /dev/null
@@ -1,553 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static parquet.Log.DEBUG;
-import static parquet.format.Util.writeFileMetaData;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.Log;
-import parquet.Version;
-import parquet.bytes.BytesInput;
-import parquet.bytes.BytesUtils;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.DictionaryPage;
-import parquet.column.statistics.Statistics;
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.metadata.FileMetaData;
-import parquet.hadoop.metadata.GlobalMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-/**
- * Internal implementation of the Parquet file writer as a block container
- *
- * @author Julien Le Dem
- *
- */
-public class ParquetFileWriter {
- private static final Log LOG = Log.getLog(ParquetFileWriter.class);
-
- public static final String PARQUET_METADATA_FILE = "_metadata";
- public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
- public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
- public static final int CURRENT_VERSION = 1;
-
- // File creation modes
- public static enum Mode {
- CREATE,
- OVERWRITE
- }
-
- private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
-
- private final MessageType schema;
- private final FSDataOutputStream out;
- private BlockMetaData currentBlock;
- private ColumnChunkMetaData currentColumn;
- private long currentRecordCount;
- private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
- private long uncompressedLength;
- private long compressedLength;
- private Set<parquet.column.Encoding> currentEncodings;
-
- private CompressionCodecName currentChunkCodec;
- private ColumnPath currentChunkPath;
- private PrimitiveTypeName currentChunkType;
- private long currentChunkFirstDataPage;
- private long currentChunkDictionaryPageOffset;
- private long currentChunkValueCount;
-
- private Statistics currentStatistics;
-
- /**
- * Captures the order in which methods should be called
- *
- * @author Julien Le Dem
- *
- */
- private enum STATE {
- NOT_STARTED {
- STATE start() {
- return STARTED;
- }
- },
- STARTED {
- STATE startBlock() {
- return BLOCK;
- }
- STATE end() {
- return ENDED;
- }
- },
- BLOCK {
- STATE startColumn() {
- return COLUMN;
- }
- STATE endBlock() {
- return STARTED;
- }
- },
- COLUMN {
- STATE endColumn() {
- return BLOCK;
- };
- STATE write() {
- return this;
- }
- },
- ENDED;
-
- STATE start() throws IOException { return error(); }
- STATE startBlock() throws IOException { return error(); }
- STATE startColumn() throws IOException { return error(); }
- STATE write() throws IOException { return error(); }
- STATE endColumn() throws IOException { return error(); }
- STATE endBlock() throws IOException { return error(); }
- STATE end() throws IOException { return error(); }
-
- private final STATE error() throws IOException {
- throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
- }
- }
-
- private STATE state = STATE.NOT_STARTED;
-
- /**
- * @param configuration Hadoop configuration
- * @param schema the schema of the data
- * @param file the file to write to
- * @throws IOException if the file can not be created
- */
- public ParquetFileWriter(Configuration configuration, MessageType schema,
- Path file) throws IOException {
- this(configuration, schema, file, Mode.CREATE);
- }
-
- /**
- * @param configuration Hadoop configuration
- * @param schema the schema of the data
- * @param file the file to write to
- * @param mode file creation mode
- * @throws IOException if the file can not be created
- */
- public ParquetFileWriter(Configuration configuration, MessageType schema,
- Path file, Mode mode) throws IOException {
- super();
- this.schema = schema;
- FileSystem fs = file.getFileSystem(configuration);
- boolean overwriteFlag = (mode == Mode.OVERWRITE);
- this.out = fs.create(file, overwriteFlag);
- }
-
- /**
- * start the file
- * @throws IOException
- */
- public void start() throws IOException {
- state = state.start();
- if (DEBUG) LOG.debug(out.getPos() + ": start");
- out.write(MAGIC);
- }
-
- /**
- * start a block
- * @param recordCount the record count in this block
- * @throws IOException
- */
- public void startBlock(long recordCount) throws IOException {
- state = state.startBlock();
- if (DEBUG) LOG.debug(out.getPos() + ": start block");
-// out.write(MAGIC); // TODO: add a magic delimiter
- currentBlock = new BlockMetaData();
- currentRecordCount = recordCount;
- }
-
- /**
- * start a column inside a block
- * @param descriptor the column descriptor
- * @param valueCount the value count in this column
- * @param statistics the statistics in this column
- * @param compressionCodecName
- * @throws IOException
- */
- public void startColumn(ColumnDescriptor descriptor,
- long valueCount,
- CompressionCodecName compressionCodecName) throws IOException {
- state = state.startColumn();
- if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
- currentEncodings = new HashSet<parquet.column.Encoding>();
- currentChunkPath = ColumnPath.get(descriptor.getPath());
- currentChunkType = descriptor.getType();
- currentChunkCodec = compressionCodecName;
- currentChunkValueCount = valueCount;
- currentChunkFirstDataPage = out.getPos();
- compressedLength = 0;
- uncompressedLength = 0;
- // need to know what type of stats to initialize to
- // better way to do this?
- currentStatistics = Statistics.getStatsBasedOnType(currentChunkType);
- }
-
- /**
- * writes a dictionary page page
- * @param dictionaryPage the dictionary page
- */
- public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
- state = state.write();
- if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values");
- currentChunkDictionaryPageOffset = out.getPos();
- int uncompressedSize = dictionaryPage.getUncompressedSize();
- int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
- metadataConverter.writeDictionaryPageHeader(
- uncompressedSize,
- compressedPageSize,
- dictionaryPage.getDictionarySize(),
- dictionaryPage.getEncoding(),
- out);
- long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
- this.uncompressedLength += uncompressedSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize);
- dictionaryPage.getBytes().writeAllTo(out);
- currentEncodings.add(dictionaryPage.getEncoding());
- }
-
-
- /**
- * writes a single page
- * @param valueCount count of values
- * @param uncompressedPageSize the size of the data once uncompressed
- * @param bytes the compressed data for the page without header
- * @param rlEncoding encoding of the repetition level
- * @param dlEncoding encoding of the definition level
- * @param valuesEncoding encoding of values
- */
- @Deprecated
- public void writeDataPage(
- int valueCount, int uncompressedPageSize,
- BytesInput bytes,
- parquet.column.Encoding rlEncoding,
- parquet.column.Encoding dlEncoding,
- parquet.column.Encoding valuesEncoding) throws IOException {
- state = state.write();
- long beforeHeader = out.getPos();
- if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
- int compressedPageSize = (int)bytes.size();
- metadataConverter.writeDataPageHeader(
- uncompressedPageSize, compressedPageSize,
- valueCount,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- out);
- long headerSize = out.getPos() - beforeHeader;
- this.uncompressedLength += uncompressedPageSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
- bytes.writeAllTo(out);
- currentEncodings.add(rlEncoding);
- currentEncodings.add(dlEncoding);
- currentEncodings.add(valuesEncoding);
- }
-
- /**
- * writes a single page
- * @param valueCount count of values
- * @param uncompressedPageSize the size of the data once uncompressed
- * @param bytes the compressed data for the page without header
- * @param rlEncoding encoding of the repetition level
- * @param dlEncoding encoding of the definition level
- * @param valuesEncoding encoding of values
- */
- public void writeDataPage(
- int valueCount, int uncompressedPageSize,
- BytesInput bytes,
- Statistics statistics,
- parquet.column.Encoding rlEncoding,
- parquet.column.Encoding dlEncoding,
- parquet.column.Encoding valuesEncoding) throws IOException {
- state = state.write();
- long beforeHeader = out.getPos();
- if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
- int compressedPageSize = (int)bytes.size();
- metadataConverter.writeDataPageHeader(
- uncompressedPageSize, compressedPageSize,
- valueCount,
- statistics,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- out);
- long headerSize = out.getPos() - beforeHeader;
- this.uncompressedLength += uncompressedPageSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
- bytes.writeAllTo(out);
- currentStatistics.mergeStatistics(statistics);
- currentEncodings.add(rlEncoding);
- currentEncodings.add(dlEncoding);
- currentEncodings.add(valuesEncoding);
- }
-
- /**
- * writes a number of pages at once
- * @param bytes bytes to be written including page headers
- * @param uncompressedTotalPageSize total uncompressed size (without page headers)
- * @param compressedTotalPageSize total compressed size (without page headers)
- * @throws IOException
- */
- void writeDataPages(BytesInput bytes,
- long uncompressedTotalPageSize,
- long compressedTotalPageSize,
- Statistics totalStats,
- List<parquet.column.Encoding> encodings) throws IOException {
- state = state.write();
- if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
- long headersSize = bytes.size() - compressedTotalPageSize;
- this.uncompressedLength += uncompressedTotalPageSize + headersSize;
- this.compressedLength += compressedTotalPageSize + headersSize;
- if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
- bytes.writeAllTo(out);
- currentEncodings.addAll(encodings);
- currentStatistics = totalStats;
- }
-
- /**
- * end a column (once all rep, def and data have been written)
- * @throws IOException
- */
- public void endColumn() throws IOException {
- state = state.endColumn();
- if (DEBUG) LOG.debug(out.getPos() + ": end column");
- currentBlock.addColumn(ColumnChunkMetaData.get(
- currentChunkPath,
- currentChunkType,
- currentChunkCodec,
- currentEncodings,
- currentStatistics,
- currentChunkFirstDataPage,
- currentChunkDictionaryPageOffset,
- currentChunkValueCount,
- compressedLength,
- uncompressedLength));
- if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
- currentColumn = null;
- this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
- this.uncompressedLength = 0;
- this.compressedLength = 0;
- }
-
- /**
- * ends a block once all column chunks have been written
- * @throws IOException
- */
- public void endBlock() throws IOException {
- state = state.endBlock();
- if (DEBUG) LOG.debug(out.getPos() + ": end block");
- currentBlock.setRowCount(currentRecordCount);
- blocks.add(currentBlock);
- currentBlock = null;
- }
-
- /**
- * ends a file once all blocks have been written.
- * closes the file.
- * @param extraMetaData the extra meta data to write in the footer
- * @throws IOException
- */
- public void end(Map<String, String> extraMetaData) throws IOException {
- state = state.end();
- if (DEBUG) LOG.debug(out.getPos() + ": end");
- ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
- serializeFooter(footer, out);
- out.close();
- }
-
- private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException {
- long footerIndex = out.getPos();
- parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer);
- writeFileMetaData(parquetMetadata, out);
- if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
- BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
- out.write(MAGIC);
- }
-
- /**
- * writes a _metadata and _common_metadata file
- * @param configuration the configuration to use to get the FileSystem
- * @param outputPath the directory to write the _metadata file to
- * @param footers the list of footers to merge
- * @throws IOException
- */
- public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
- ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
- FileSystem fs = outputPath.getFileSystem(configuration);
- outputPath = outputPath.makeQualified(fs);
- writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);
- metadataFooter.getBlocks().clear();
- writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE);
- }
-
- private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile)
- throws IOException {
- Path metaDataPath = new Path(outputPath, parquetMetadataFile);
- FSDataOutputStream metadata = fs.create(metaDataPath);
- metadata.write(MAGIC);
- serializeFooter(metadataFooter, metadata);
- metadata.close();
- }
-
- static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
- String rootPath = root.toUri().getPath();
- GlobalMetaData fileMetaData = null;
- List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
- for (Footer footer : footers) {
- String footerPath = footer.getFile().toUri().getPath();
- if (!footerPath.startsWith(rootPath)) {
- throw new ParquetEncodingException(footerPath + " invalid: all the files must be contained in the root " + root);
- }
- footerPath = footerPath.substring(rootPath.length());
- while (footerPath.startsWith("/")) {
- footerPath = footerPath.substring(1);
- }
- fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
- for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
- block.setPath(footerPath);
- blocks.add(block);
- }
- }
- return new ParquetMetadata(fileMetaData.merge(), blocks);
- }
-
- /**
- * @return the current position in the underlying file
- * @throws IOException
- */
- public long getPos() throws IOException {
- return out.getPos();
- }
-
- /**
- * Will merge the metadata of all the footers together
- * @param footers the list files footers to merge
- * @return the global meta data for all the footers
- */
- static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
- return getGlobalMetaData(footers, true);
- }
-
- static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
- GlobalMetaData fileMetaData = null;
- for (Footer footer : footers) {
- ParquetMetadata currentMetadata = footer.getParquetMetadata();
- fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData, strict);
- }
- return fileMetaData;
- }
-
- /**
- * Will return the result of merging toMerge into mergedMetadata
- * @param toMerge the metadata toMerge
- * @param mergedMetadata the reference metadata to merge into
- * @return the result of the merge
- */
- static GlobalMetaData mergeInto(
- FileMetaData toMerge,
- GlobalMetaData mergedMetadata) {
- return mergeInto(toMerge, mergedMetadata, true);
- }
-
- static GlobalMetaData mergeInto(
- FileMetaData toMerge,
- GlobalMetaData mergedMetadata,
- boolean strict) {
- MessageType schema = null;
- Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
- Set<String> createdBy = new HashSet<String>();
- if (mergedMetadata != null) {
- schema = mergedMetadata.getSchema();
- newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
- createdBy.addAll(mergedMetadata.getCreatedBy());
- }
- if ((schema == null && toMerge.getSchema() != null)
- || (schema != null && !schema.equals(toMerge.getSchema()))) {
- schema = mergeInto(toMerge.getSchema(), schema, strict);
- }
- for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
- Set<String> values = newKeyValues.get(entry.getKey());
- if (values == null) {
- values = new HashSet<String>();
- newKeyValues.put(entry.getKey(), values);
- }
- values.add(entry.getValue());
- }
- createdBy.add(toMerge.getCreatedBy());
- return new GlobalMetaData(
- schema,
- newKeyValues,
- createdBy);
- }
-
- /**
- * will return the result of merging toMerge into mergedSchema
- * @param toMerge the schema to merge into mergedSchema
- * @param mergedSchema the schema to append the fields to
- * @return the resulting schema
- */
- static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
- return mergeInto(toMerge, mergedSchema, true);
- }
-
- /**
- * will return the result of merging toMerge into mergedSchema
- * @param toMerge the schema to merge into mergedSchema
- * @param mergedSchema the schema to append the fields to
- * @param strict should schema primitive types match
- * @return the resulting schema
- */
- static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema, boolean strict) {
- if (mergedSchema == null) {
- return toMerge;
- }
-
- return mergedSchema.union(toMerge, strict);
- }
-
-}