You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:32 UTC
[35/51] [partial] parquet-mr git commit: PARQUET-23: Rename to
org.apache.parquet.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/Encoding.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/Encoding.java b/parquet-column/src/main/java/parquet/column/Encoding.java
deleted file mode 100644
index 7aa5c1d..0000000
--- a/parquet-column/src/main/java/parquet/column/Encoding.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column;
-
-import static parquet.column.values.bitpacking.Packer.BIG_ENDIAN;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
-
-import java.io.IOException;
-
-import parquet.bytes.BytesUtils;
-import parquet.column.page.DictionaryPage;
-import parquet.column.values.ValuesReader;
-import parquet.column.values.bitpacking.ByteBitPackingValuesReader;
-import parquet.column.values.boundedint.ZeroIntegerValuesReader;
-import parquet.column.values.delta.DeltaBinaryPackingValuesReader;
-import parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
-import parquet.column.values.deltastrings.DeltaByteArrayReader;
-import parquet.column.values.dictionary.DictionaryValuesReader;
-import parquet.column.values.dictionary.PlainValuesDictionary.PlainBinaryDictionary;
-import parquet.column.values.dictionary.PlainValuesDictionary.PlainDoubleDictionary;
-import parquet.column.values.dictionary.PlainValuesDictionary.PlainFloatDictionary;
-import parquet.column.values.dictionary.PlainValuesDictionary.PlainIntegerDictionary;
-import parquet.column.values.dictionary.PlainValuesDictionary.PlainLongDictionary;
-import parquet.column.values.plain.BinaryPlainValuesReader;
-import parquet.column.values.plain.FixedLenByteArrayPlainValuesReader;
-import parquet.column.values.plain.BooleanPlainValuesReader;
-import parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader;
-import parquet.column.values.plain.PlainValuesReader.FloatPlainValuesReader;
-import parquet.column.values.plain.PlainValuesReader.IntegerPlainValuesReader;
-import parquet.column.values.plain.PlainValuesReader.LongPlainValuesReader;
-import parquet.column.values.rle.RunLengthBitPackingHybridValuesReader;
-import parquet.io.ParquetDecodingException;
-
-/**
- * encoding of the data
- *
- * @author Julien Le Dem
- *
- */
-public enum Encoding {
-
- PLAIN {
- @Override
- public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
- switch (descriptor.getType()) {
- case BOOLEAN:
- return new BooleanPlainValuesReader();
- case BINARY:
- return new BinaryPlainValuesReader();
- case FLOAT:
- return new FloatPlainValuesReader();
- case DOUBLE:
- return new DoublePlainValuesReader();
- case INT32:
- return new IntegerPlainValuesReader();
- case INT64:
- return new LongPlainValuesReader();
- case INT96:
- return new FixedLenByteArrayPlainValuesReader(12);
- case FIXED_LEN_BYTE_ARRAY:
- return new FixedLenByteArrayPlainValuesReader(descriptor.getTypeLength());
- default:
- throw new ParquetDecodingException("no plain reader for type " + descriptor.getType());
- }
- }
-
- @Override
- public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
- switch (descriptor.getType()) {
- case BINARY:
- return new PlainBinaryDictionary(dictionaryPage);
- case FIXED_LEN_BYTE_ARRAY:
- return new PlainBinaryDictionary(dictionaryPage, descriptor.getTypeLength());
- case INT96:
- return new PlainBinaryDictionary(dictionaryPage, 12);
- case INT64:
- return new PlainLongDictionary(dictionaryPage);
- case DOUBLE:
- return new PlainDoubleDictionary(dictionaryPage);
- case INT32:
- return new PlainIntegerDictionary(dictionaryPage);
- case FLOAT:
- return new PlainFloatDictionary(dictionaryPage);
- default:
- throw new ParquetDecodingException("Dictionary encoding not supported for type: " + descriptor.getType());
- }
-
- }
- },
-
- /**
- * Actually a combination of bit packing and run length encoding.
- * TODO: Should we rename this to be more clear?
- */
- RLE {
- @Override
- public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
- int bitWidth = BytesUtils.getWidthFromMaxInt(getMaxLevel(descriptor, valuesType));
- if(bitWidth == 0) {
- return new ZeroIntegerValuesReader();
- }
- return new RunLengthBitPackingHybridValuesReader(bitWidth);
- }
- },
-
- /**
- * @deprecated This is no longer used, and has been replaced by {@link #RLE}
- * which is combination of bit packing and rle
- */
- @Deprecated
- BIT_PACKED {
- @Override
- public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
- return new ByteBitPackingValuesReader(getMaxLevel(descriptor, valuesType), BIG_ENDIAN);
- }
- },
-
- /**
- * @deprecated now replaced by RLE_DICTIONARY for the data page encoding and PLAIN for the dictionary page encoding
- */
- @Deprecated
- PLAIN_DICTIONARY {
- @Override
- public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
- return RLE_DICTIONARY.getDictionaryBasedValuesReader(descriptor, valuesType, dictionary);
- }
-
- @Override
- public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
- return PLAIN.initDictionary(descriptor, dictionaryPage);
- }
-
- @Override
- public boolean usesDictionary() {
- return true;
- }
-
- },
-
- /**
- * Delta encoding for integers. This can be used for int columns and works best
- * on sorted data
- */
- DELTA_BINARY_PACKED {
- @Override
- public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
- if(descriptor.getType() != INT32) {
- throw new ParquetDecodingException("Encoding DELTA_BINARY_PACKED is only supported for type INT32");
- }
- return new DeltaBinaryPackingValuesReader();
- }
- },
-
- /**
- * Encoding for byte arrays to separate the length values and the data. The lengths
- * are encoded using DELTA_BINARY_PACKED
- */
- DELTA_LENGTH_BYTE_ARRAY {
- @Override
- public ValuesReader getValuesReader(ColumnDescriptor descriptor,
- ValuesType valuesType) {
- if (descriptor.getType() != BINARY) {
- throw new ParquetDecodingException("Encoding DELTA_LENGTH_BYTE_ARRAY is only supported for type BINARY");
- }
- return new DeltaLengthByteArrayValuesReader();
- }
- },
-
- /**
- * Incremental-encoded byte array. Prefix lengths are encoded using DELTA_BINARY_PACKED.
- * Suffixes are stored as delta length byte arrays.
- */
- DELTA_BYTE_ARRAY {
- @Override
- public ValuesReader getValuesReader(ColumnDescriptor descriptor,
- ValuesType valuesType) {
- if (descriptor.getType() != BINARY) {
- throw new ParquetDecodingException("Encoding DELTA_BYTE_ARRAY is only supported for type BINARY");
- }
- return new DeltaByteArrayReader();
- }
- },
-
- /**
- * Dictionary encoding: the ids are encoded using the RLE encoding
- */
- RLE_DICTIONARY {
-
- @Override
- public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
- switch (descriptor.getType()) {
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- case INT96:
- case INT64:
- case DOUBLE:
- case INT32:
- case FLOAT:
- return new DictionaryValuesReader(dictionary);
- default:
- throw new ParquetDecodingException("Dictionary encoding not supported for type: " + descriptor.getType());
- }
- }
-
- @Override
- public boolean usesDictionary() {
- return true;
- }
-
- };
-
- int getMaxLevel(ColumnDescriptor descriptor, ValuesType valuesType) {
- int maxLevel;
- switch (valuesType) {
- case REPETITION_LEVEL:
- maxLevel = descriptor.getMaxRepetitionLevel();
- break;
- case DEFINITION_LEVEL:
- maxLevel = descriptor.getMaxDefinitionLevel();
- break;
- case VALUES:
- if(descriptor.getType() == BOOLEAN) {
- maxLevel = 1;
- break;
- }
- default:
- throw new ParquetDecodingException("Unsupported encoding for values: " + this);
- }
- return maxLevel;
- }
-
- /**
- * @return whether this encoding requires a dictionary
- */
- public boolean usesDictionary() {
- return false;
- }
-
- /**
- * initializes a dictionary from a page
- * @param dictionaryPage
- * @return the corresponding dictionary
- */
- public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
- throw new UnsupportedOperationException(this.name() + " does not support dictionary");
- }
-
- /**
- * To read decoded values that don't require a dictionary
- *
- * @param descriptor the column to read
- * @param valuesType the type of values
- * @return the proper values reader for the given column
- * @throw {@link UnsupportedOperationException} if the encoding is dictionary based
- */
- public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
- throw new UnsupportedOperationException("Error decoding " + descriptor + ". " + this.name() + " is dictionary based");
- }
-
- /**
- * To read decoded values that require a dictionary
- *
- * @param descriptor the column to read
- * @param valuesType the type of values
- * @param dictionary the dictionary
- * @return the proper values reader for the given column
- * @throw {@link UnsupportedOperationException} if the encoding is not dictionary based
- */
- public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
- throw new UnsupportedOperationException(this.name() + " is not dictionary based");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/parquet/column/ParquetProperties.java
deleted file mode 100644
index 31bec36..0000000
--- a/parquet-column/src/main/java/parquet/column/ParquetProperties.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column;
-
-import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
-import static parquet.column.Encoding.PLAIN;
-import static parquet.column.Encoding.PLAIN_DICTIONARY;
-import static parquet.column.Encoding.RLE_DICTIONARY;
-import parquet.column.impl.ColumnWriteStoreV1;
-import parquet.column.impl.ColumnWriteStoreV2;
-import parquet.column.page.PageWriteStore;
-import parquet.column.values.ValuesWriter;
-import parquet.column.values.boundedint.DevNullValuesWriter;
-import parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
-import parquet.column.values.deltastrings.DeltaByteArrayWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
-import parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter;
-import parquet.column.values.fallback.FallbackValuesWriter;
-import parquet.column.values.plain.BooleanPlainValuesWriter;
-import parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
-import parquet.column.values.plain.PlainValuesWriter;
-import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
-import parquet.schema.MessageType;
-
-/**
- * This class represents all the configurable Parquet properties.
- *
- * @author amokashi
- *
- */
-public class ParquetProperties {
-
- public enum WriterVersion {
- PARQUET_1_0 ("v1"),
- PARQUET_2_0 ("v2");
-
- private final String shortName;
-
- WriterVersion(String shortname) {
- this.shortName = shortname;
- }
-
- public static WriterVersion fromString(String name) {
- for (WriterVersion v : WriterVersion.values()) {
- if (v.shortName.equals(name)) {
- return v;
- }
- }
- // Throws IllegalArgumentException if name does not exact match with enum name
- return WriterVersion.valueOf(name);
- }
- }
- private final int dictionaryPageSizeThreshold;
- private final WriterVersion writerVersion;
- private final boolean enableDictionary;
-
- public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean enableDict) {
- this.dictionaryPageSizeThreshold = dictPageSize;
- this.writerVersion = writerVersion;
- this.enableDictionary = enableDict;
- }
-
- public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
- if (maxLevel == 0) {
- return new DevNullValuesWriter();
- } else {
- return new RunLengthBitPackingHybridValuesWriter(
- getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize);
- }
- }
-
- private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
- switch (path.getType()) {
- case BOOLEAN:
- return new BooleanPlainValuesWriter();
- case INT96:
- return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize);
- case FIXED_LEN_BYTE_ARRAY:
- return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize);
- case BINARY:
- case INT32:
- case INT64:
- case DOUBLE:
- case FLOAT:
- return new PlainValuesWriter(initialSizePerCol, pageSize);
- default:
- throw new IllegalArgumentException("Unknown type " + path.getType());
- }
- }
-
- private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initialSizePerCol) {
- Encoding encodingForDataPage;
- Encoding encodingForDictionaryPage;
- switch(writerVersion) {
- case PARQUET_1_0:
- encodingForDataPage = PLAIN_DICTIONARY;
- encodingForDictionaryPage = PLAIN_DICTIONARY;
- break;
- case PARQUET_2_0:
- encodingForDataPage = RLE_DICTIONARY;
- encodingForDictionaryPage = PLAIN;
- break;
- default:
- throw new IllegalArgumentException("Unknown version: " + writerVersion);
- }
- switch (path.getType()) {
- case BOOLEAN:
- throw new IllegalArgumentException("no dictionary encoding for BOOLEAN");
- case BINARY:
- return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
- case INT32:
- return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
- case INT64:
- return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
- case INT96:
- return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 12, encodingForDataPage, encodingForDictionaryPage);
- case DOUBLE:
- return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
- case FLOAT:
- return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
- case FIXED_LEN_BYTE_ARRAY:
- return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, path.getTypeLength(), encodingForDataPage, encodingForDictionaryPage);
- default:
- throw new IllegalArgumentException("Unknown type " + path.getType());
- }
- }
-
- private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
- switch(writerVersion) {
- case PARQUET_1_0:
- return plainWriter(path, initialSizePerCol, pageSize);
- case PARQUET_2_0:
- switch (path.getType()) {
- case BOOLEAN:
- return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize);
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- return new DeltaByteArrayWriter(initialSizePerCol, pageSize);
- case INT32:
- return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize);
- case INT96:
- case INT64:
- case DOUBLE:
- case FLOAT:
- return plainWriter(path, initialSizePerCol, pageSize);
- default:
- throw new IllegalArgumentException("Unknown type " + path.getType());
- }
- default:
- throw new IllegalArgumentException("Unknown version: " + writerVersion);
- }
- }
-
- private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
- ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol, pageSize);
- if (enableDictionary) {
- return FallbackValuesWriter.of(
- dictionaryWriter(path, initialSizePerCol),
- writerToFallBackTo);
- } else {
- return writerToFallBackTo;
- }
- }
-
- public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
- switch (path.getType()) {
- case BOOLEAN: // no dictionary encoding for boolean
- return writerToFallbackTo(path, initialSizePerCol, pageSize);
- case FIXED_LEN_BYTE_ARRAY:
- // dictionary encoding for that type was not enabled in PARQUET 1.0
- if (writerVersion == WriterVersion.PARQUET_2_0) {
- return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
- } else {
- return writerToFallbackTo(path, initialSizePerCol, pageSize);
- }
- case BINARY:
- case INT32:
- case INT64:
- case INT96:
- case DOUBLE:
- case FLOAT:
- return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
- default:
- throw new IllegalArgumentException("Unknown type " + path.getType());
- }
- }
-
- public int getDictionaryPageSizeThreshold() {
- return dictionaryPageSizeThreshold;
- }
-
- public WriterVersion getWriterVersion() {
- return writerVersion;
- }
-
- public boolean isEnableDictionary() {
- return enableDictionary;
- }
-
- public ColumnWriteStore newColumnWriteStore(
- MessageType schema,
- PageWriteStore pageStore,
- int pageSize) {
- switch (writerVersion) {
- case PARQUET_1_0:
- return new ColumnWriteStoreV1(
- pageStore,
- pageSize,
- dictionaryPageSizeThreshold,
- enableDictionary, writerVersion);
- case PARQUET_2_0:
- return new ColumnWriteStoreV2(
- schema,
- pageStore,
- pageSize,
- new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
- default:
- throw new IllegalArgumentException("unknown version " + writerVersion);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/UnknownColumnException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/UnknownColumnException.java b/parquet-column/src/main/java/parquet/column/UnknownColumnException.java
deleted file mode 100644
index c3b7dc8..0000000
--- a/parquet-column/src/main/java/parquet/column/UnknownColumnException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column;
-
-import parquet.ParquetRuntimeException;
-
-/**
- * Thrown if the specified column is unknown in the underlying storage
- *
- * @author Julien Le Dem
- */
-public class UnknownColumnException extends ParquetRuntimeException {
- private static final long serialVersionUID = 1L;
-
- private final ColumnDescriptor descriptor;
-
- public UnknownColumnException(ColumnDescriptor descriptor) {
- super("Column not found: " + descriptor.toString());
- this.descriptor = descriptor;
- }
-
- public ColumnDescriptor getDescriptor() {
- return descriptor;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/UnknownColumnTypeException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/UnknownColumnTypeException.java b/parquet-column/src/main/java/parquet/column/UnknownColumnTypeException.java
deleted file mode 100644
index 32b3016..0000000
--- a/parquet-column/src/main/java/parquet/column/UnknownColumnTypeException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column;
-
-import parquet.ParquetRuntimeException;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-/**
- * Thrown if the specified column type is unknown in the underlying storage
- *
- * @author Katya Gonina
- */
-public class UnknownColumnTypeException extends ParquetRuntimeException {
- private static final long serialVersionUID = 1L;
-
- private final PrimitiveTypeName type;
-
- public UnknownColumnTypeException(PrimitiveTypeName type) {
- super("Column type not found: " + type.toString());
- this.type= type;
- }
-
- public PrimitiveTypeName getType() {
- return this.type;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/ValuesType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/ValuesType.java b/parquet-column/src/main/java/parquet/column/ValuesType.java
deleted file mode 100644
index f6e9322..0000000
--- a/parquet-column/src/main/java/parquet/column/ValuesType.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column;
-
-/**
- * The different type of values we can store in columns
- *
- * @author Julien Le Dem
- *
- */
-public enum ValuesType {
- REPETITION_LEVEL, DEFINITION_LEVEL, VALUES;
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/impl/ColumnReadStoreImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReadStoreImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReadStoreImpl.java
deleted file mode 100644
index a98919f..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnReadStoreImpl.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnReadStore;
-import parquet.column.ColumnReader;
-import parquet.column.page.PageReadStore;
-import parquet.column.page.PageReader;
-import parquet.io.api.Converter;
-import parquet.io.api.GroupConverter;
-import parquet.io.api.PrimitiveConverter;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
-
-/**
- * Implementation of the ColumnReadStore
- *
- * Initializes individual columns based on schema and converter
- *
- * @author Julien Le Dem
- *
- */
-public class ColumnReadStoreImpl implements ColumnReadStore {
-
- private final PageReadStore pageReadStore;
- private final GroupConverter recordConverter;
- private final MessageType schema;
-
- /**
- * @param pageReadStore uderlying page storage
- * @param recordConverter the user provided converter to materialize records
- * @param schema the schema we are reading
- */
- public ColumnReadStoreImpl(PageReadStore pageReadStore, GroupConverter recordConverter, MessageType schema) {
- super();
- this.pageReadStore = pageReadStore;
- this.recordConverter = recordConverter;
- this.schema = schema;
- }
-
- @Override
- public ColumnReader getColumnReader(ColumnDescriptor path) {
- return newMemColumnReader(path, pageReadStore.getPageReader(path));
- }
-
- private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
- PrimitiveConverter converter = getPrimitiveConverter(path);
- return new ColumnReaderImpl(path, pageReader, converter);
- }
-
- private PrimitiveConverter getPrimitiveConverter(ColumnDescriptor path) {
- Type currentType = schema;
- Converter currentConverter = recordConverter;
- for (String fieldName : path.getPath()) {
- final GroupType groupType = currentType.asGroupType();
- int fieldIndex = groupType.getFieldIndex(fieldName);
- currentType = groupType.getType(fieldName);
- currentConverter = currentConverter.asGroupConverter().getConverter(fieldIndex);
- }
- PrimitiveConverter converter = currentConverter.asPrimitiveConverter();
- return converter;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
deleted file mode 100644
index e2a6a3a..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
+++ /dev/null
@@ -1,661 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import static java.lang.String.format;
-import static parquet.Log.DEBUG;
-import static parquet.Preconditions.checkNotNull;
-import static parquet.column.ValuesType.DEFINITION_LEVEL;
-import static parquet.column.ValuesType.REPETITION_LEVEL;
-import static parquet.column.ValuesType.VALUES;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.bytes.BytesUtils;
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnReader;
-import parquet.column.Dictionary;
-import parquet.column.Encoding;
-import parquet.column.page.DataPage;
-import parquet.column.page.DataPageV1;
-import parquet.column.page.DataPageV2;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageReader;
-import parquet.column.values.ValuesReader;
-import parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import parquet.io.ParquetDecodingException;
-import parquet.io.api.Binary;
-import parquet.io.api.PrimitiveConverter;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
-
-/**
- * ColumnReader implementation
- *
- * @author Julien Le Dem
- *
- */
-class ColumnReaderImpl implements ColumnReader {
- private static final Log LOG = Log.getLog(ColumnReaderImpl.class);
-
- /**
- * binds the lower level page decoder to the record converter materializing the records
- *
- * @author Julien Le Dem
- *
- */
- private static abstract class Binding {
-
- /**
- * read one value from the underlying page
- */
- abstract void read();
-
- /**
- * skip one value from the underlying page
- */
- abstract void skip();
-
- /**
- * write current value to converter
- */
- abstract void writeValue();
-
- /**
- * @return current value
- */
- public int getDictionaryId() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @return current value
- */
- public int getInteger() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @return current value
- */
- public boolean getBoolean() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @return current value
- */
- public long getLong() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @return current value
- */
- public Binary getBinary() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @return current value
- */
- public float getFloat() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @return current value
- */
- public double getDouble() {
- throw new UnsupportedOperationException();
- }
- }
-
- private final ColumnDescriptor path;
- private final long totalValueCount;
- private final PageReader pageReader;
- private final Dictionary dictionary;
-
- private IntIterator repetitionLevelColumn;
- private IntIterator definitionLevelColumn;
- protected ValuesReader dataColumn;
-
- private int repetitionLevel;
- private int definitionLevel;
- private int dictionaryId;
-
- private long endOfPageValueCount;
- private int readValues;
- private int pageValueCount;
-
- private final PrimitiveConverter converter;
- private Binding binding;
-
- // this is needed because we will attempt to read the value twice when filtering
- // TODO: rework that
- private boolean valueRead;
-
- private void bindToDictionary(final Dictionary dictionary) {
- binding =
- new Binding() {
- void read() {
- dictionaryId = dataColumn.readValueDictionaryId();
- }
- public void skip() {
- dataColumn.skip();
- }
- public int getDictionaryId() {
- return dictionaryId;
- }
- void writeValue() {
- converter.addValueFromDictionary(dictionaryId);
- }
- public int getInteger() {
- return dictionary.decodeToInt(dictionaryId);
- }
- public boolean getBoolean() {
- return dictionary.decodeToBoolean(dictionaryId);
- }
- public long getLong() {
- return dictionary.decodeToLong(dictionaryId);
- }
- public Binary getBinary() {
- return dictionary.decodeToBinary(dictionaryId);
- }
- public float getFloat() {
- return dictionary.decodeToFloat(dictionaryId);
- }
- public double getDouble() {
- return dictionary.decodeToDouble(dictionaryId);
- }
- };
- }
-
- private void bind(PrimitiveTypeName type) {
- binding = type.convert(new PrimitiveTypeNameConverter<Binding, RuntimeException>() {
- @Override
- public Binding convertFLOAT(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return new Binding() {
- float current;
- void read() {
- current = dataColumn.readFloat();
- }
- public void skip() {
- current = 0;
- dataColumn.skip();
- }
- public float getFloat() {
- return current;
- }
- void writeValue() {
- converter.addFloat(current);
- }
- };
- }
- @Override
- public Binding convertDOUBLE(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return new Binding() {
- double current;
- void read() {
- current = dataColumn.readDouble();
- }
- public void skip() {
- current = 0;
- dataColumn.skip();
- }
- public double getDouble() {
- return current;
- }
- void writeValue() {
- converter.addDouble(current);
- }
- };
- }
- @Override
- public Binding convertINT32(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return new Binding() {
- int current;
- void read() {
- current = dataColumn.readInteger();
- }
- public void skip() {
- current = 0;
- dataColumn.skip();
- }
- @Override
- public int getInteger() {
- return current;
- }
- void writeValue() {
- converter.addInt(current);
- }
- };
- }
- @Override
- public Binding convertINT64(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return new Binding() {
- long current;
- void read() {
- current = dataColumn.readLong();
- }
- public void skip() {
- current = 0;
- dataColumn.skip();
- }
- @Override
- public long getLong() {
- return current;
- }
- void writeValue() {
- converter.addLong(current);
- }
- };
- }
- @Override
- public Binding convertINT96(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return this.convertBINARY(primitiveTypeName);
- }
- @Override
- public Binding convertFIXED_LEN_BYTE_ARRAY(
- PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return this.convertBINARY(primitiveTypeName);
- }
- @Override
- public Binding convertBOOLEAN(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return new Binding() {
- boolean current;
- void read() {
- current = dataColumn.readBoolean();
- }
- public void skip() {
- current = false;
- dataColumn.skip();
- }
- @Override
- public boolean getBoolean() {
- return current;
- }
- void writeValue() {
- converter.addBoolean(current);
- }
- };
- }
- @Override
- public Binding convertBINARY(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
- return new Binding() {
- Binary current;
- void read() {
- current = dataColumn.readBytes();
- }
- public void skip() {
- current = null;
- dataColumn.skip();
- }
- @Override
- public Binary getBinary() {
- return current;
- }
- void writeValue() {
- converter.addBinary(current);
- }
- };
- }
- });
- }
-
- /**
- * creates a reader for triplets
- * @param path the descriptor for the corresponding column
- * @param pageReader the underlying store to read from
- */
- public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter) {
- this.path = checkNotNull(path, "path");
- this.pageReader = checkNotNull(pageReader, "pageReader");
- this.converter = checkNotNull(converter, "converter");
- DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
- if (dictionaryPage != null) {
- try {
- this.dictionary = dictionaryPage.getEncoding().initDictionary(path, dictionaryPage);
- if (converter.hasDictionarySupport()) {
- converter.setDictionary(dictionary);
- }
- } catch (IOException e) {
- throw new ParquetDecodingException("could not decode the dictionary for " + path, e);
- }
- } else {
- this.dictionary = null;
- }
- this.totalValueCount = pageReader.getTotalValueCount();
- if (totalValueCount == 0) {
- throw new ParquetDecodingException("totalValueCount == 0");
- }
- consume();
- }
-
- private boolean isFullyConsumed() {
- return readValues >= totalValueCount;
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#writeCurrentValueToConverter()
- */
- @Override
- public void writeCurrentValueToConverter() {
- readValue();
- this.binding.writeValue();
- }
-
- @Override
- public int getCurrentValueDictionaryID() {
- readValue();
- return binding.getDictionaryId();
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#getInteger()
- */
- @Override
- public int getInteger() {
- readValue();
- return this.binding.getInteger();
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#getBoolean()
- */
- @Override
- public boolean getBoolean() {
- readValue();
- return this.binding.getBoolean();
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#getLong()
- */
- @Override
- public long getLong() {
- readValue();
- return this.binding.getLong();
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#getBinary()
- */
- @Override
- public Binary getBinary() {
- readValue();
- return this.binding.getBinary();
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#getFloat()
- */
- @Override
- public float getFloat() {
- readValue();
- return this.binding.getFloat();
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#getDouble()
- */
- @Override
- public double getDouble() {
- readValue();
- return this.binding.getDouble();
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#getCurrentRepetitionLevel()
- */
- @Override
- public int getCurrentRepetitionLevel() {
- return repetitionLevel;
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#getDescriptor()
- */
- @Override
- public ColumnDescriptor getDescriptor() {
- return path;
- }
-
- /**
- * Reads the value into the binding.
- */
- public void readValue() {
- try {
- if (!valueRead) {
- binding.read();
- valueRead = true;
- }
- } catch (RuntimeException e) {
- throw new ParquetDecodingException(
- format(
- "Can't read value in column %s at value %d out of %d, %d out of %d in currentPage. repetition level: %d, definition level: %d",
- path, readValues, totalValueCount, readValues - (endOfPageValueCount - pageValueCount), pageValueCount, repetitionLevel, definitionLevel),
- e);
- }
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#skip()
- */
- @Override
- public void skip() {
- if (!valueRead) {
- binding.skip();
- valueRead = true;
- }
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#getCurrentDefinitionLevel()
- */
- @Override
- public int getCurrentDefinitionLevel() {
- return definitionLevel;
- }
-
- // TODO: change the logic around read() to not tie together reading from the 3 columns
- private void readRepetitionAndDefinitionLevels() {
- repetitionLevel = repetitionLevelColumn.nextInt();
- definitionLevel = definitionLevelColumn.nextInt();
- ++readValues;
- }
-
- private void checkRead() {
- if (isPageFullyConsumed()) {
- if (isFullyConsumed()) {
- if (DEBUG) LOG.debug("end reached");
- repetitionLevel = 0; // the next repetition level
- return;
- }
- readPage();
- }
- readRepetitionAndDefinitionLevels();
- }
-
- private void readPage() {
- if (DEBUG) LOG.debug("loading page");
- DataPage page = pageReader.readPage();
- page.accept(new DataPage.Visitor<Void>() {
- @Override
- public Void visit(DataPageV1 dataPageV1) {
- readPageV1(dataPageV1);
- return null;
- }
- @Override
- public Void visit(DataPageV2 dataPageV2) {
- readPageV2(dataPageV2);
- return null;
- }
- });
- }
-
- private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) {
- this.pageValueCount = valueCount;
- this.endOfPageValueCount = readValues + pageValueCount;
- if (dataEncoding.usesDictionary()) {
- if (dictionary == null) {
- throw new ParquetDecodingException(
- "could not read page in col " + path + " as the dictionary was missing for encoding " + dataEncoding);
- }
- this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(path, VALUES, dictionary);
- } else {
- this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
- }
- if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
- bindToDictionary(dictionary);
- } else {
- bind(path.getType());
- }
- try {
- dataColumn.initFromPage(pageValueCount, bytes, offset);
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page in col " + path, e);
- }
- }
-
- private void readPageV1(DataPageV1 page) {
- ValuesReader rlReader = page.getRlEncoding().getValuesReader(path, REPETITION_LEVEL);
- ValuesReader dlReader = page.getDlEncoding().getValuesReader(path, DEFINITION_LEVEL);
- this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
- this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
- try {
- byte[] bytes = page.getBytes().toByteArray();
- if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
- if (DEBUG) LOG.debug("reading repetition levels at 0");
- rlReader.initFromPage(pageValueCount, bytes, 0);
- int next = rlReader.getNextOffset();
- if (DEBUG) LOG.debug("reading definition levels at " + next);
- dlReader.initFromPage(pageValueCount, bytes, next);
- next = dlReader.getNextOffset();
- if (DEBUG) LOG.debug("reading data at " + next);
- initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
- }
- }
-
- private void readPageV2(DataPageV2 page) {
- this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
- this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
- try {
- if (DEBUG) LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
- initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
- }
- }
-
- private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
- try {
- if (maxLevel == 0) {
- return new NullIntIterator();
- }
- return new RLEIntIterator(
- new RunLengthBitPackingHybridDecoder(
- BytesUtils.getWidthFromMaxInt(maxLevel),
- new ByteArrayInputStream(bytes.toByteArray())));
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read levels in page for col " + path, e);
- }
- }
-
- private boolean isPageFullyConsumed() {
- return readValues >= endOfPageValueCount;
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#consume()
- */
- @Override
- public void consume() {
- checkRead();
- valueRead = false;
- }
-
- /**
- * {@inheritDoc}
- * @see parquet.column.ColumnReader#getTotalValueCount()
- */
- @Override
- public long getTotalValueCount() {
- return totalValueCount;
- }
-
- static abstract class IntIterator {
- abstract int nextInt();
- }
-
- static class ValuesReaderIntIterator extends IntIterator {
- ValuesReader delegate;
-
- public ValuesReaderIntIterator(ValuesReader delegate) {
- super();
- this.delegate = delegate;
- }
-
- @Override
- int nextInt() {
- return delegate.readInteger();
- }
- }
-
- static class RLEIntIterator extends IntIterator {
- RunLengthBitPackingHybridDecoder delegate;
-
- public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
- this.delegate = delegate;
- }
-
- @Override
- int nextInt() {
- try {
- return delegate.readInt();
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
- }
- }
-
- private static final class NullIntIterator extends IntIterator {
- @Override
- int nextInt() {
- return 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java
deleted file mode 100644
index 2c476ca..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnWriteStore;
-import parquet.column.ColumnWriter;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.page.PageWriteStore;
-import parquet.column.page.PageWriter;
-
-public class ColumnWriteStoreV1 implements ColumnWriteStore {
-
- private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();
- private final PageWriteStore pageWriteStore;
- private final int pageSizeThreshold;
- private final int dictionaryPageSizeThreshold;
- private final boolean enableDictionary;
- private final WriterVersion writerVersion;
-
- public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
- super();
- this.pageWriteStore = pageWriteStore;
- this.pageSizeThreshold = pageSizeThreshold;
- this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
- this.enableDictionary = enableDictionary;
- this.writerVersion = writerVersion;
- }
-
- public ColumnWriter getColumnWriter(ColumnDescriptor path) {
- ColumnWriterV1 column = columns.get(path);
- if (column == null) {
- column = newMemColumn(path);
- columns.put(path, column);
- }
- return column;
- }
-
- public Set<ColumnDescriptor> getColumnDescriptors() {
- return columns.keySet();
- }
-
- private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
- PageWriter pageWriter = pageWriteStore.getPageWriter(path);
- return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- for (Entry<ColumnDescriptor, ColumnWriterV1> entry : columns.entrySet()) {
- sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
- sb.append(entry.getValue().getBufferedSizeInMemory()).append(" bytes");
- sb.append("\n");
- }
- return sb.toString();
- }
-
- @Override
- public long getAllocatedSize() {
- Collection<ColumnWriterV1> values = columns.values();
- long total = 0;
- for (ColumnWriterV1 memColumn : values) {
- total += memColumn.allocatedSize();
- }
- return total;
- }
-
- @Override
- public long getBufferedSize() {
- Collection<ColumnWriterV1> values = columns.values();
- long total = 0;
- for (ColumnWriterV1 memColumn : values) {
- total += memColumn.getBufferedSizeInMemory();
- }
- return total;
- }
-
- @Override
- public String memUsageString() {
- StringBuilder b = new StringBuilder("Store {\n");
- Collection<ColumnWriterV1> values = columns.values();
- for (ColumnWriterV1 memColumn : values) {
- b.append(memColumn.memUsageString(" "));
- }
- b.append("}\n");
- return b.toString();
- }
-
- public long maxColMemSize() {
- Collection<ColumnWriterV1> values = columns.values();
- long max = 0;
- for (ColumnWriterV1 memColumn : values) {
- max = Math.max(max, memColumn.getBufferedSizeInMemory());
- }
- return max;
- }
-
- @Override
- public void flush() {
- Collection<ColumnWriterV1> values = columns.values();
- for (ColumnWriterV1 memColumn : values) {
- memColumn.flush();
- }
- }
-
- @Override
- public void endRecord() {
- // V1 does not take record boundaries into account
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java
deleted file mode 100644
index 2dc342e..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import static java.lang.Math.max;
-import static java.lang.Math.min;
-import static java.util.Collections.unmodifiableMap;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnWriteStore;
-import parquet.column.ColumnWriter;
-import parquet.column.ParquetProperties;
-import parquet.column.page.PageWriteStore;
-import parquet.column.page.PageWriter;
-import parquet.schema.MessageType;
-
-public class ColumnWriteStoreV2 implements ColumnWriteStore {
-
- // will wait for at least that many records before checking again
- private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
- private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
- // will flush even if size bellow the threshold by this much to facilitate page alignment
- private static final float THRESHOLD_TOLERANCE_RATIO = 0.1f; // 10 %
-
- private final Map<ColumnDescriptor, ColumnWriterV2> columns;
- private final Collection<ColumnWriterV2> writers;
- private long rowCount;
- private long rowCountForNextSizeCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
- private final long thresholdTolerance;
-
- private int pageSizeThreshold;
-
- public ColumnWriteStoreV2(
- MessageType schema,
- PageWriteStore pageWriteStore,
- int pageSizeThreshold,
- ParquetProperties parquetProps) {
- super();
- this.pageSizeThreshold = pageSizeThreshold;
- this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
- Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
- for (ColumnDescriptor path : schema.getColumns()) {
- PageWriter pageWriter = pageWriteStore.getPageWriter(path);
- mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold));
- }
- this.columns = unmodifiableMap(mcolumns);
- this.writers = this.columns.values();
- }
-
- public ColumnWriter getColumnWriter(ColumnDescriptor path) {
- return columns.get(path);
- }
-
- public Set<ColumnDescriptor> getColumnDescriptors() {
- return columns.keySet();
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- for (Entry<ColumnDescriptor, ColumnWriterV2> entry : columns.entrySet()) {
- sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
- sb.append(entry.getValue().getTotalBufferedSize()).append(" bytes");
- sb.append("\n");
- }
- return sb.toString();
- }
-
- @Override
- public long getAllocatedSize() {
- long total = 0;
- for (ColumnWriterV2 memColumn : columns.values()) {
- total += memColumn.allocatedSize();
- }
- return total;
- }
-
- @Override
- public long getBufferedSize() {
- long total = 0;
- for (ColumnWriterV2 memColumn : columns.values()) {
- total += memColumn.getTotalBufferedSize();
- }
- return total;
- }
-
- @Override
- public void flush() {
- for (ColumnWriterV2 memColumn : columns.values()) {
- long rows = rowCount - memColumn.getRowsWrittenSoFar();
- if (rows > 0) {
- memColumn.writePage(rowCount);
- }
- memColumn.finalizeColumnChunk();
- }
- }
-
- public String memUsageString() {
- StringBuilder b = new StringBuilder("Store {\n");
- for (ColumnWriterV2 memColumn : columns.values()) {
- b.append(memColumn.memUsageString(" "));
- }
- b.append("}\n");
- return b.toString();
- }
-
- @Override
- public void endRecord() {
- ++ rowCount;
- if (rowCount >= rowCountForNextSizeCheck) {
- sizeCheck();
- }
- }
-
- private void sizeCheck() {
- long minRecordToWait = Long.MAX_VALUE;
- for (ColumnWriterV2 writer : writers) {
- long usedMem = writer.getCurrentPageBufferedSize();
- long rows = rowCount - writer.getRowsWrittenSoFar();
- long remainingMem = pageSizeThreshold - usedMem;
- if (remainingMem <= thresholdTolerance) {
- writer.writePage(rowCount);
- remainingMem = pageSizeThreshold;
- }
- long rowsToFillPage =
- usedMem == 0 ?
- MAXIMUM_RECORD_COUNT_FOR_CHECK
- : (long)((float)rows) / usedMem * remainingMem;
- if (rowsToFillPage < minRecordToWait) {
- minRecordToWait = rowsToFillPage;
- }
- }
- if (minRecordToWait == Long.MAX_VALUE) {
- minRecordToWait = MINIMUM_RECORD_COUNT_FOR_CHECK;
- }
- // will check again halfway
- rowCountForNextSizeCheck = rowCount +
- min(
- max(minRecordToWait / 2, MINIMUM_RECORD_COUNT_FOR_CHECK), // no less than MINIMUM_RECORD_COUNT_FOR_CHECK
- MAXIMUM_RECORD_COUNT_FOR_CHECK); // no more than MAXIMUM_RECORD_COUNT_FOR_CHECK
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java
deleted file mode 100644
index ce4af80..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import static parquet.bytes.BytesInput.concat;
-
-import java.io.IOException;
-
-import parquet.Log;
-import parquet.bytes.CapacityByteArrayOutputStream;
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnWriter;
-import parquet.column.ParquetProperties;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.Statistics;
-import parquet.column.values.ValuesWriter;
-import parquet.io.ParquetEncodingException;
-import parquet.io.api.Binary;
-
-import static java.lang.Math.max;
-import static java.lang.Math.pow;
-
-/**
- * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
- *
- * @author Julien Le Dem
- *
- */
-final class ColumnWriterV1 implements ColumnWriter {
- private static final Log LOG = Log.getLog(ColumnWriterV1.class);
- private static final boolean DEBUG = Log.DEBUG;
- private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
- private static final int MIN_SLAB_SIZE = 64;
-
- private final ColumnDescriptor path;
- private final PageWriter pageWriter;
- private final long pageSizeThreshold;
- private ValuesWriter repetitionLevelColumn;
- private ValuesWriter definitionLevelColumn;
- private ValuesWriter dataColumn;
- private int valueCount;
- private int valueCountForNextSizeCheck;
-
- private Statistics statistics;
-
- public ColumnWriterV1(
- ColumnDescriptor path,
- PageWriter pageWriter,
- int pageSizeThreshold,
- int dictionaryPageSizeThreshold,
- boolean enableDictionary,
- WriterVersion writerVersion) {
- this.path = path;
- this.pageWriter = pageWriter;
- this.pageSizeThreshold = pageSizeThreshold;
- // initial check of memory usage. So that we have enough data to make an initial prediction
- this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
- resetStatistics();
-
- ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
-
- this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
- this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
-
- int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
- this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
- }
-
- private void log(Object value, int r, int d) {
- LOG.debug(path + " " + value + " r:" + r + " d:" + d);
- }
-
- private void resetStatistics() {
- this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
- }
-
- /**
- * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
- *
- * We measure the memory used when we reach the mid point toward our estimated count.
- * We then update the estimate and flush the page if we reached the threshold.
- *
- * That way we check the memory size log2(n) times.
- *
- */
- private void accountForValueWritten() {
- ++ valueCount;
- if (valueCount > valueCountForNextSizeCheck) {
- // not checking the memory used for every value
- long memSize = repetitionLevelColumn.getBufferedSize()
- + definitionLevelColumn.getBufferedSize()
- + dataColumn.getBufferedSize();
- if (memSize > pageSizeThreshold) {
- // we will write the current page and check again the size at the predicted middle of next page
- valueCountForNextSizeCheck = valueCount / 2;
- writePage();
- } else {
- // not reached the threshold, will check again midway
- valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
- }
- }
- }
-
- private void updateStatisticsNumNulls() {
- statistics.incrementNumNulls();
- }
-
- private void updateStatistics(int value) {
- statistics.updateStats(value);
- }
-
- private void updateStatistics(long value) {
- statistics.updateStats(value);
- }
-
- private void updateStatistics(float value) {
- statistics.updateStats(value);
- }
-
- private void updateStatistics(double value) {
- statistics.updateStats(value);
- }
-
- private void updateStatistics(Binary value) {
- statistics.updateStats(value);
- }
-
- private void updateStatistics(boolean value) {
- statistics.updateStats(value);
- }
-
- private void writePage() {
- if (DEBUG) LOG.debug("write page");
- try {
- pageWriter.writePage(
- concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),
- valueCount,
- statistics,
- repetitionLevelColumn.getEncoding(),
- definitionLevelColumn.getEncoding(),
- dataColumn.getEncoding());
- } catch (IOException e) {
- throw new ParquetEncodingException("could not write page for " + path, e);
- }
- repetitionLevelColumn.reset();
- definitionLevelColumn.reset();
- dataColumn.reset();
- valueCount = 0;
- resetStatistics();
- }
-
- @Override
- public void writeNull(int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(null, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- updateStatisticsNumNulls();
- accountForValueWritten();
- }
-
- @Override
- public void write(double value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- dataColumn.writeDouble(value);
- updateStatistics(value);
- accountForValueWritten();
- }
-
- @Override
- public void write(float value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- dataColumn.writeFloat(value);
- updateStatistics(value);
- accountForValueWritten();
- }
-
- @Override
- public void write(Binary value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- dataColumn.writeBytes(value);
- updateStatistics(value);
- accountForValueWritten();
- }
-
- @Override
- public void write(boolean value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- dataColumn.writeBoolean(value);
- updateStatistics(value);
- accountForValueWritten();
- }
-
- @Override
- public void write(int value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- dataColumn.writeInteger(value);
- updateStatistics(value);
- accountForValueWritten();
- }
-
- @Override
- public void write(long value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- dataColumn.writeLong(value);
- updateStatistics(value);
- accountForValueWritten();
- }
-
- public void flush() {
- if (valueCount > 0) {
- writePage();
- }
- final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
- if (dictionaryPage != null) {
- if (DEBUG) LOG.debug("write dictionary");
- try {
- pageWriter.writeDictionaryPage(dictionaryPage);
- } catch (IOException e) {
- throw new ParquetEncodingException("could not write dictionary page for " + path, e);
- }
- dataColumn.resetDictionary();
- }
- }
-
- public long getBufferedSizeInMemory() {
- return repetitionLevelColumn.getBufferedSize()
- + definitionLevelColumn.getBufferedSize()
- + dataColumn.getBufferedSize()
- + pageWriter.getMemSize();
- }
-
- public long allocatedSize() {
- return repetitionLevelColumn.getAllocatedSize()
- + definitionLevelColumn.getAllocatedSize()
- + dataColumn.getAllocatedSize()
- + pageWriter.allocatedSize();
- }
-
- public String memUsageString(String indent) {
- StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
- b.append(repetitionLevelColumn.memUsageString(indent + " r:")).append("\n");
- b.append(definitionLevelColumn.memUsageString(indent + " d:")).append("\n");
- b.append(dataColumn.memUsageString(indent + " data:")).append("\n");
- b.append(pageWriter.memUsageString(indent + " pages:")).append("\n");
- b.append(indent).append(String.format(" total: %,d/%,d", getBufferedSizeInMemory(), allocatedSize())).append("\n");
- b.append(indent).append("}\n");
- return b.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java
deleted file mode 100644
index e7b8c1c..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import static java.lang.Math.max;
-import static java.lang.Math.pow;
-import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
-
-import java.io.IOException;
-
-import parquet.Ints;
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.bytes.CapacityByteArrayOutputStream;
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnWriter;
-import parquet.column.Encoding;
-import parquet.column.ParquetProperties;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.Statistics;
-import parquet.column.values.ValuesWriter;
-import parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
-import parquet.io.ParquetEncodingException;
-import parquet.io.api.Binary;
-
-/**
- * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
- *
- * @author Julien Le Dem
- *
- */
-final class ColumnWriterV2 implements ColumnWriter {
- private static final Log LOG = Log.getLog(ColumnWriterV2.class);
- private static final boolean DEBUG = Log.DEBUG;
- private static final int MIN_SLAB_SIZE = 64;
-
- private final ColumnDescriptor path;
- private final PageWriter pageWriter;
- private RunLengthBitPackingHybridEncoder repetitionLevelColumn;
- private RunLengthBitPackingHybridEncoder definitionLevelColumn;
- private ValuesWriter dataColumn;
- private int valueCount;
-
- private Statistics<?> statistics;
- private long rowsWrittenSoFar = 0;
-
- public ColumnWriterV2(
- ColumnDescriptor path,
- PageWriter pageWriter,
- ParquetProperties parquetProps,
- int pageSize) {
- this.path = path;
- this.pageWriter = pageWriter;
- resetStatistics();
-
- this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
- this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);
-
- int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10);
- this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
- }
-
- private void log(Object value, int r, int d) {
- LOG.debug(path + " " + value + " r:" + r + " d:" + d);
- }
-
- private void resetStatistics() {
- this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
- }
-
- private void definitionLevel(int definitionLevel) {
- try {
- definitionLevelColumn.writeInt(definitionLevel);
- } catch (IOException e) {
- throw new ParquetEncodingException("illegal definition level " + definitionLevel + " for column " + path, e);
- }
- }
-
- private void repetitionLevel(int repetitionLevel) {
- try {
- repetitionLevelColumn.writeInt(repetitionLevel);
- } catch (IOException e) {
- throw new ParquetEncodingException("illegal repetition level " + repetitionLevel + " for column " + path, e);
- }
- }
-
- /**
- * writes the current null value
- * @param repetitionLevel
- * @param definitionLevel
- */
- public void writeNull(int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(null, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- statistics.incrementNumNulls();
- ++ valueCount;
- }
-
- /**
- * writes the current value
- * @param value
- * @param repetitionLevel
- * @param definitionLevel
- */
- public void write(double value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- dataColumn.writeDouble(value);
- statistics.updateStats(value);
- ++ valueCount;
- }
-
- /**
- * writes the current value
- * @param value
- * @param repetitionLevel
- * @param definitionLevel
- */
- public void write(float value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- dataColumn.writeFloat(value);
- statistics.updateStats(value);
- ++ valueCount;
- }
-
- /**
- * writes the current value
- * @param value
- * @param repetitionLevel
- * @param definitionLevel
- */
- public void write(Binary value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- dataColumn.writeBytes(value);
- statistics.updateStats(value);
- ++ valueCount;
- }
-
- /**
- * writes the current value
- * @param value
- * @param repetitionLevel
- * @param definitionLevel
- */
- public void write(boolean value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- dataColumn.writeBoolean(value);
- statistics.updateStats(value);
- ++ valueCount;
- }
-
- /**
- * writes the current value
- * @param value
- * @param repetitionLevel
- * @param definitionLevel
- */
- public void write(int value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- dataColumn.writeInteger(value);
- statistics.updateStats(value);
- ++ valueCount;
- }
-
- /**
- * writes the current value
- * @param value
- * @param repetitionLevel
- * @param definitionLevel
- */
- public void write(long value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- dataColumn.writeLong(value);
- statistics.updateStats(value);
- ++ valueCount;
- }
-
- /**
- * Finalizes the Column chunk. Possibly adding extra pages if needed (dictionary, ...)
- * Is called right after writePage
- */
- public void finalizeColumnChunk() {
- final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
- if (dictionaryPage != null) {
- if (DEBUG) LOG.debug("write dictionary");
- try {
- pageWriter.writeDictionaryPage(dictionaryPage);
- } catch (IOException e) {
- throw new ParquetEncodingException("could not write dictionary page for " + path, e);
- }
- dataColumn.resetDictionary();
- }
- }
-
- /**
- * used to decide when to write a page
- * @return the number of bytes of memory used to buffer the current data
- */
- public long getCurrentPageBufferedSize() {
- return repetitionLevelColumn.getBufferedSize()
- + definitionLevelColumn.getBufferedSize()
- + dataColumn.getBufferedSize();
- }
-
- /**
- * used to decide when to write a page or row group
- * @return the number of bytes of memory used to buffer the current data and the previously written pages
- */
- public long getTotalBufferedSize() {
- return repetitionLevelColumn.getBufferedSize()
- + definitionLevelColumn.getBufferedSize()
- + dataColumn.getBufferedSize()
- + pageWriter.getMemSize();
- }
-
- /**
- * @return actual memory used
- */
- public long allocatedSize() {
- return repetitionLevelColumn.getAllocatedSize()
- + definitionLevelColumn.getAllocatedSize()
- + dataColumn.getAllocatedSize()
- + pageWriter.allocatedSize();
- }
-
- /**
- * @param prefix a prefix to format lines
- * @return a formatted string showing how memory is used
- */
- public String memUsageString(String indent) {
- StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
- b.append(indent).append(" r:").append(repetitionLevelColumn.getAllocatedSize()).append(" bytes\n");
- b.append(indent).append(" d:").append(definitionLevelColumn.getAllocatedSize()).append(" bytes\n");
- b.append(dataColumn.memUsageString(indent + " data:")).append("\n");
- b.append(pageWriter.memUsageString(indent + " pages:")).append("\n");
- b.append(indent).append(String.format(" total: %,d/%,d", getTotalBufferedSize(), allocatedSize())).append("\n");
- b.append(indent).append("}\n");
- return b.toString();
- }
-
- public long getRowsWrittenSoFar() {
- return this.rowsWrittenSoFar;
- }
-
- /**
- * writes the current data to a new page in the page store
- * @param rowCount how many rows have been written so far
- */
- public void writePage(long rowCount) {
- int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
- this.rowsWrittenSoFar = rowCount;
- if (DEBUG) LOG.debug("write page");
- try {
- // TODO: rework this API. Those must be called *in that order*
- BytesInput bytes = dataColumn.getBytes();
- Encoding encoding = dataColumn.getEncoding();
- pageWriter.writePageV2(
- pageRowCount,
- Ints.checkedCast(statistics.getNumNulls()),
- valueCount,
- path.getMaxRepetitionLevel() == 0 ? BytesInput.empty() : repetitionLevelColumn.toBytes(),
- path.getMaxDefinitionLevel() == 0 ? BytesInput.empty() : definitionLevelColumn.toBytes(),
- encoding,
- bytes,
- statistics
- );
- } catch (IOException e) {
- throw new ParquetEncodingException("could not write page for " + path, e);
- }
- repetitionLevelColumn.reset();
- definitionLevelColumn.reset();
- dataColumn.reset();
- valueCount = 0;
- resetStatistics();
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/page/DataPage.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/DataPage.java b/parquet-column/src/main/java/parquet/column/page/DataPage.java
deleted file mode 100644
index 8043fd0..0000000
--- a/parquet-column/src/main/java/parquet/column/page/DataPage.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.page;
-
-/**
- * one data page in a chunk
- *
- * @author Julien Le Dem
- *
- */
-abstract public class DataPage extends Page {
-
- private final int valueCount;
-
- DataPage(int compressedSize, int uncompressedSize, int valueCount) {
- super(compressedSize, uncompressedSize);
- this.valueCount = valueCount;
- }
-
- /**
- * @return the number of values in that page
- */
- public int getValueCount() {
- return valueCount;
- }
-
- public abstract <T> T accept(Visitor<T> visitor);
-
- public static interface Visitor<T> {
-
- T visit(DataPageV1 dataPageV1);
-
- T visit(DataPageV2 dataPageV2);
-
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/column/page/DataPageV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/DataPageV1.java b/parquet-column/src/main/java/parquet/column/page/DataPageV1.java
deleted file mode 100644
index 867bb4a..0000000
--- a/parquet-column/src/main/java/parquet/column/page/DataPageV1.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.page;
-
-import parquet.Ints;
-import parquet.bytes.BytesInput;
-import parquet.column.Encoding;
-import parquet.column.statistics.Statistics;
-
-public class DataPageV1 extends DataPage {
-
- private final BytesInput bytes;
- private final Statistics<?> statistics;
- private final Encoding rlEncoding;
- private final Encoding dlEncoding;
- private final Encoding valuesEncoding;
-
- /**
- * @param bytes the bytes for this page
- * @param valueCount count of values in this page
- * @param uncompressedSize the uncompressed size of the page
- * @param statistics of the page's values (max, min, num_null)
- * @param rlEncoding the repetition level encoding for this page
- * @param dlEncoding the definition level encoding for this page
- * @param valuesEncoding the values encoding for this page
- * @param dlEncoding
- */
- public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, Statistics<?> stats, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
- super(Ints.checkedCast(bytes.size()), uncompressedSize, valueCount);
- this.bytes = bytes;
- this.statistics = stats;
- this.rlEncoding = rlEncoding;
- this.dlEncoding = dlEncoding;
- this.valuesEncoding = valuesEncoding;
- }
-
- /**
- * @return the bytes for the page
- */
- public BytesInput getBytes() {
- return bytes;
- }
-
- /**
- *
- * @return the statistics for this page (max, min, num_nulls)
- */
- public Statistics<?> getStatistics() {
- return statistics;
- }
-
- /**
- * @return the definition level encoding for this page
- */
- public Encoding getDlEncoding() {
- return dlEncoding;
- }
-
- /**
- * @return the repetition level encoding for this page
- */
- public Encoding getRlEncoding() {
- return rlEncoding;
- }
-
- /**
- * @return the values encoding for this page
- */
- public Encoding getValueEncoding() {
- return valuesEncoding;
- }
-
- @Override
- public String toString() {
- return "Page [bytes.size=" + bytes.size() + ", valueCount=" + getValueCount() + ", uncompressedSize=" + getUncompressedSize() + "]";
- }
-
- @Override
- public <T> T accept(Visitor<T> visitor) {
- return visitor.visit(this);
- }
-}