You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:38 UTC
[41/51] [partial] parquet-mr git commit: PARQUET-23: Rename to
org.apache.parquet.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
new file mode 100644
index 0000000..1b3692f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetEncodingException;
+
+import java.io.IOException;
+
+/**
+ * Write integers with delta encoding and binary packing
+ * The format is as follows:
+ * <p/>
+ * <pre>
+ * {@code
+ * delta-binary-packing: <page-header> <block>*
+ * page-header := <block size in values> <number of miniblocks in a block> <total value count> <first value>
+ * block := <min delta> <list of bitwidths of miniblocks> <miniblocks>
+ *
+ * min delta : zig-zag var int encoded
+ * bitWidthsOfMiniBlock : 1 byte little endian
+ * blockSizeInValues,blockSizeInValues,totalValueCount,firstValue : unsigned varint
+ * }
+ * </pre>
+ *
+ * The algorithm and format is inspired by D. Lemire's paper: http://lemire.me/blog/archives/2012/09/12/fast-integer-compression-decoding-billions-of-integers-per-second/
+ *
+ * @author Tianshuo Deng
+ */
+public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
+ /**
+ * max bitwidth for a mini block, it is used to allocate miniBlockByteBuffer which is
+ * reused between flushes.
+ */
+ public static final int MAX_BITWIDTH = 32;
+
+ public static final int DEFAULT_NUM_BLOCK_VALUES = 128;
+
+ public static final int DEFAULT_NUM_MINIBLOCKS = 4;
+
+ private final CapacityByteArrayOutputStream baos;
+
+ /**
+ * stores blockSizeInValues, miniBlockNumInABlock and miniBlockSizeInValues
+ */
+ private final DeltaBinaryPackingConfig config;
+
+ /**
+ * bit width for each mini block, reused between flushes
+ */
+ private final int[] bitWidths;
+
+ private int totalValueCount = 0;
+
+ /**
+ * a pointer to deltaBlockBuffer indicating the end of deltaBlockBuffer
+ * the number of values in the deltaBlockBuffer that haven't flushed to baos
+ * it will be reset after each flush
+ */
+ private int deltaValuesToFlush = 0;
+
+ /**
+ * stores delta values starting from the 2nd value written(1st value is stored in header).
+ * It's reused between flushes
+ */
+ private int[] deltaBlockBuffer;
+
+ /**
+ * bytes buffer for a mini block, it is reused for each mini block.
+ * Therefore the size of biggest miniblock with bitwith of MAX_BITWITH is allocated
+ */
+ private byte[] miniBlockByteBuffer;
+
+ /**
+ * firstValue is written to the header of the page
+ */
+ private int firstValue = 0;
+
+ /**
+ * cache previous written value for calculating delta
+ */
+ private int previousValue = 0;
+
+ /**
+ * min delta is written to the beginning of each block.
+ * it's zig-zag encoded. The deltas stored in each block is actually the difference to min delta,
+ * therefore are all positive
+ * it will be reset after each flush
+ */
+ private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
+
+ public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize) {
+ this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize);
+ }
+
+ public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize) {
+ this.config = new DeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum);
+ bitWidths = new int[config.miniBlockNumInABlock];
+ deltaBlockBuffer = new int[blockSizeInValues];
+ miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
+ baos = new CapacityByteArrayOutputStream(slabSize, pageSize);
+ }
+
+ @Override
+ public long getBufferedSize() {
+ return baos.size();
+ }
+
+ @Override
+ public void writeInteger(int v) {
+ totalValueCount++;
+
+ if (totalValueCount == 1) {
+ firstValue = v;
+ previousValue = firstValue;
+ return;
+ }
+
+ int delta = v - previousValue;//calculate delta
+ previousValue = v;
+
+ deltaBlockBuffer[deltaValuesToFlush++] = delta;
+
+ if (delta < minDeltaInCurrentBlock) {
+ minDeltaInCurrentBlock = delta;
+ }
+
+ if (config.blockSizeInValues == deltaValuesToFlush) {
+ flushBlockBuffer();
+ }
+ }
+
+ private void flushBlockBuffer() {
+ //since we store the min delta, the deltas will be converted to be the difference to min delta and all positive
+ for (int i = 0; i < deltaValuesToFlush; i++) {
+ deltaBlockBuffer[i] = deltaBlockBuffer[i] - minDeltaInCurrentBlock;
+ }
+
+ writeMinDelta();
+ int miniBlocksToFlush = getMiniBlockCountToFlush(deltaValuesToFlush);
+
+ calculateBitWidthsForDeltaBlockBuffer(miniBlocksToFlush);
+ for (int i = 0; i < config.miniBlockNumInABlock; i++) {
+ writeBitWidthForMiniBlock(i);
+ }
+
+ for (int i = 0; i < miniBlocksToFlush; i++) {
+ //writing i th miniblock
+ int currentBitWidth = bitWidths[i];
+ BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(currentBitWidth);
+ int miniBlockStart = i * config.miniBlockSizeInValues;
+ for (int j = miniBlockStart; j < (i + 1) * config.miniBlockSizeInValues; j += 8) {//8 values per pack
+ // mini block is atomic in terms of flushing
+ // This may write more values when reach to the end of data writing to last mini block,
+ // since it may not be aligend to miniblock,
+ // but doesnt matter. The reader uses total count to see if reached the end.
+ packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, 0);
+ baos.write(miniBlockByteBuffer, 0, currentBitWidth);
+ }
+ }
+
+ minDeltaInCurrentBlock = Integer.MAX_VALUE;
+ deltaValuesToFlush = 0;
+ }
+
+ private void writeBitWidthForMiniBlock(int i) {
+ try {
+ BytesUtils.writeIntLittleEndianOnOneByte(baos, bitWidths[i]);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("can not write bitwith for miniblock", e);
+ }
+ }
+
+ private void writeMinDelta() {
+ try {
+ BytesUtils.writeZigZagVarInt(minDeltaInCurrentBlock, baos);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("can not write min delta for block", e);
+ }
+ }
+
+ /**
+ * iterate through values in each mini block and calculate the bitWidths of max values.
+ *
+ * @param miniBlocksToFlush
+ */
+ private void calculateBitWidthsForDeltaBlockBuffer(int miniBlocksToFlush) {
+ for (int miniBlockIndex = 0; miniBlockIndex < miniBlocksToFlush; miniBlockIndex++) {
+
+ int mask = 0;
+ int miniStart = miniBlockIndex * config.miniBlockSizeInValues;
+
+ //The end of current mini block could be the end of current block(deltaValuesToFlush) buffer when data is not aligned to mini block
+ int miniEnd = Math.min((miniBlockIndex + 1) * config.miniBlockSizeInValues, deltaValuesToFlush);
+
+ for (int i = miniStart; i < miniEnd; i++) {
+ mask |= deltaBlockBuffer[i];
+ }
+ bitWidths[miniBlockIndex] = 32 - Integer.numberOfLeadingZeros(mask);
+ }
+ }
+
+ private int getMiniBlockCountToFlush(double numberCount) {
+ return (int) Math.ceil(numberCount / config.miniBlockSizeInValues);
+ }
+
+ /**
+ * getBytes will trigger flushing block buffer, DO NOT write after getBytes() is called without calling reset()
+ *
+ * @return
+ */
+ @Override
+ public BytesInput getBytes() {
+ //The Page Header should include: blockSizeInValues, numberOfMiniBlocks, totalValueCount
+ if (deltaValuesToFlush != 0) {
+ flushBlockBuffer();
+ }
+ return BytesInput.concat(
+ config.toBytesInput(),
+ BytesInput.fromUnsignedVarInt(totalValueCount),
+ BytesInput.fromZigZagVarInt(firstValue),
+ BytesInput.from(baos));
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ return Encoding.DELTA_BINARY_PACKED;
+ }
+
+ @Override
+ public void reset() {
+ this.totalValueCount = 0;
+ this.baos.reset();
+ this.deltaValuesToFlush = 0;
+ this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ return baos.getCapacity();
+ }
+
+ @Override
+ public String memUsageString(String prefix) {
+ return String.format("%s DeltaBinaryPacking %d bytes", prefix, getAllocatedSize());
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
new file mode 100644
index 0000000..2db1336
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.deltalengthbytearray;
+
+import static org.apache.parquet.Log.DEBUG;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Reads binary data written by {@link DeltaLengthByteArrayValueWriter}
+ *
+ * @author Aniket Mokashi
+ *
+ */
+public class DeltaLengthByteArrayValuesReader extends ValuesReader {
+
+ private static final Log LOG = Log.getLog(DeltaLengthByteArrayValuesReader.class);
+ private ValuesReader lengthReader;
+ private byte[] in;
+ private int offset;
+
+ public DeltaLengthByteArrayValuesReader() {
+ this.lengthReader = new DeltaBinaryPackingValuesReader();
+ }
+
+ @Override
+ public void initFromPage(int valueCount, byte[] in, int offset)
+ throws IOException {
+ if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+ lengthReader.initFromPage(valueCount, in, offset);
+ offset = lengthReader.getNextOffset();
+ this.in = in;
+ this.offset = offset;
+ }
+
+ @Override
+ public Binary readBytes() {
+ int length = lengthReader.readInteger();
+ int start = offset;
+ offset = start + length;
+ return Binary.fromByteArray(in, start, length);
+ }
+
+ @Override
+ public void skip() {
+ int length = lengthReader.readInteger();
+ offset = offset + length;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
new file mode 100644
index 0000000..0a498b1
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.deltalengthbytearray;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.bytes.LittleEndianDataOutputStream;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Write lengths of byte-arrays using delta encoding, followed by concatenated byte-arrays
+ * <pre>
+ * {@code
+ * delta-length-byte-array : length* byte-array*
+ * }
+ * </pre>
+ * @author Aniket Mokashi
+ *
+ */
+public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
+
+ private static final Log LOG = Log.getLog(DeltaLengthByteArrayValuesWriter.class);
+
+ private ValuesWriter lengthWriter;
+ private CapacityByteArrayOutputStream arrayOut;
+ private LittleEndianDataOutputStream out;
+
+ public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize) {
+ arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
+ out = new LittleEndianDataOutputStream(arrayOut);
+ lengthWriter = new DeltaBinaryPackingValuesWriter(
+ DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES,
+ DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS,
+ initialSize, pageSize);
+ }
+
+ @Override
+ public void writeBytes(Binary v) {
+ try {
+ lengthWriter.writeInteger(v.length());
+ out.write(v.getBytes());
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write bytes", e);
+ }
+ }
+
+ @Override
+ public long getBufferedSize() {
+ return lengthWriter.getBufferedSize() + arrayOut.size();
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ try {
+ out.flush();
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write page", e);
+ }
+ if (Log.DEBUG) LOG.debug("writing a buffer of size " + arrayOut.size());
+ return BytesInput.concat(lengthWriter.getBytes(), BytesInput.from(arrayOut));
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ return Encoding.DELTA_LENGTH_BYTE_ARRAY;
+ }
+
+ @Override
+ public void reset() {
+ lengthWriter.reset();
+ arrayOut.reset();
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ return lengthWriter.getAllocatedSize() + arrayOut.getCapacity();
+ }
+
+ @Override
+ public String memUsageString(String prefix) {
+ return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY");
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
new file mode 100644
index 0000000..de3df02
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.deltastrings;
+
+import java.io.IOException;
+
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Reads binary data written by {@link DeltaByteArrayWriter}
+ *
+ * @author Aniket Mokashi
+ *
+ */
+public class DeltaByteArrayReader extends ValuesReader {
+ private ValuesReader prefixLengthReader;
+ private ValuesReader suffixReader;
+
+ private Binary previous;
+
+ public DeltaByteArrayReader() {
+ this.prefixLengthReader = new DeltaBinaryPackingValuesReader();
+ this.suffixReader = new DeltaLengthByteArrayValuesReader();
+ this.previous = Binary.fromByteArray(new byte[0]);
+ }
+
+ @Override
+ public void initFromPage(int valueCount, byte[] page, int offset)
+ throws IOException {
+ prefixLengthReader.initFromPage(valueCount, page, offset);
+ int next = prefixLengthReader.getNextOffset();
+ suffixReader.initFromPage(valueCount, page, next);
+ }
+
+ @Override
+ public void skip() {
+ prefixLengthReader.skip();
+ suffixReader.skip();
+ }
+
+ @Override
+ public Binary readBytes() {
+ int prefixLength = prefixLengthReader.readInteger();
+ // This does not copy bytes
+ Binary suffix = suffixReader.readBytes();
+ int length = prefixLength + suffix.length();
+
+ // We have to do this to materialize the output
+ if(prefixLength != 0) {
+ byte[] out = new byte[length];
+ System.arraycopy(previous.getBytes(), 0, out, 0, prefixLength);
+ System.arraycopy(suffix.getBytes(), 0, out, prefixLength, suffix.length());
+ previous = Binary.fromByteArray(out);
+ } else {
+ previous = suffix;
+ }
+ return previous;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
new file mode 100644
index 0000000..8c71d5d
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.deltastrings;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Write prefix lengths using delta encoding, followed by suffixes with Delta length byte arrays
+ * <pre>
+ * {@code
+ * delta-length-byte-array : prefix-length* suffixes*
+ * }
+ * </pre>
+ * @author Aniket Mokashi
+ *
+ */
+public class DeltaByteArrayWriter extends ValuesWriter{
+
+ private ValuesWriter prefixLengthWriter;
+ private ValuesWriter suffixWriter;
+ private byte[] previous;
+
+ public DeltaByteArrayWriter(int initialCapacity, int pageSize) {
+ this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity, pageSize);
+ this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity, pageSize);
+ this.previous = new byte[0];
+ }
+
+ @Override
+ public long getBufferedSize() {
+ return prefixLengthWriter.getBufferedSize() + suffixWriter.getBufferedSize();
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ return BytesInput.concat(prefixLengthWriter.getBytes(), suffixWriter.getBytes());
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ return Encoding.DELTA_BYTE_ARRAY;
+ }
+
+ @Override
+ public void reset() {
+ prefixLengthWriter.reset();
+ suffixWriter.reset();
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ return prefixLengthWriter.getAllocatedSize() + suffixWriter.getAllocatedSize();
+ }
+
+ @Override
+ public String memUsageString(String prefix) {
+ prefix = prefixLengthWriter.memUsageString(prefix);
+ return suffixWriter.memUsageString(prefix + " DELTA_STRINGS");
+ }
+
+ @Override
+ public void writeBytes(Binary v) {
+ int i = 0;
+ byte[] vb = v.getBytes();
+ int length = previous.length < vb.length ? previous.length : vb.length;
+ for(i = 0; (i < length) && (previous[i] == vb[i]); i++);
+ prefixLengthWriter.writeInteger(i);
+ suffixWriter.writeBytes(Binary.fromByteArray(vb, i, vb.length - i));
+ previous = vb;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
new file mode 100644
index 0000000..972c87e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.dictionary;
+
+import static org.apache.parquet.Log.DEBUG;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Reads values that have been dictionary encoded
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class DictionaryValuesReader extends ValuesReader {
+ private static final Log LOG = Log.getLog(DictionaryValuesReader.class);
+
+ private ByteArrayInputStream in;
+
+ private Dictionary dictionary;
+
+ private RunLengthBitPackingHybridDecoder decoder;
+
+ public DictionaryValuesReader(Dictionary dictionary) {
+ this.dictionary = dictionary;
+ }
+
+ @Override
+ public void initFromPage(int valueCount, byte[] page, int offset)
+ throws IOException {
+ this.in = new ByteArrayInputStream(page, offset, page.length - offset);
+ if (page.length - offset > 0) {
+ if (DEBUG)
+ LOG.debug("init from page at offset " + offset + " for length " + (page.length - offset));
+ int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in);
+ if (DEBUG) LOG.debug("bit width " + bitWidth);
+ decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
+ } else {
+ decoder = new RunLengthBitPackingHybridDecoder(1, in) {
+ @Override
+ public int readInt() throws IOException {
+ throw new IOException("Attempt to read from empty page");
+ }
+ };
+ }
+ }
+
+ @Override
+ public int readValueDictionaryId() {
+ try {
+ return decoder.readInt();
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
+ @Override
+ public Binary readBytes() {
+ try {
+ return dictionary.decodeToBinary(decoder.readInt());
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
+ @Override
+ public float readFloat() {
+ try {
+ return dictionary.decodeToFloat(decoder.readInt());
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
+ @Override
+ public double readDouble() {
+ try {
+ return dictionary.decodeToDouble(decoder.readInt());
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
+ @Override
+ public int readInteger() {
+ try {
+ return dictionary.decodeToInt(decoder.readInt());
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
+ @Override
+ public long readLong() {
+ try {
+ return dictionary.decodeToLong(decoder.readInt());
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
+ @Override
+ public void skip() {
+ try {
+ decoder.readInt(); // Type does not matter as we are just skipping dictionary keys
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
new file mode 100644
index 0000000..928c125
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.dictionary;
+
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.bytes.BytesInput.concat;
+import it.unimi.dsi.fastutil.doubles.Double2IntLinkedOpenHashMap;
+import it.unimi.dsi.fastutil.doubles.Double2IntMap;
+import it.unimi.dsi.fastutil.doubles.DoubleIterator;
+import it.unimi.dsi.fastutil.floats.Float2IntLinkedOpenHashMap;
+import it.unimi.dsi.fastutil.floats.Float2IntMap;
+import it.unimi.dsi.fastutil.floats.FloatIterator;
+import it.unimi.dsi.fastutil.ints.Int2IntLinkedOpenHashMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2IntMap;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.values.RequiresFallback;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.dictionary.IntList.IntIterator;
+import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Will attempt to encode values using a dictionary and fall back to plain encoding
+ * if the dictionary gets too big
+ *
+ * @author Julien Le Dem
+ *
+ */
+public abstract class DictionaryValuesWriter extends ValuesWriter implements RequiresFallback {
+ private static final Log LOG = Log.getLog(DictionaryValuesWriter.class);
+
+ /* max entries allowed for the dictionary will fail over to plain encoding if reached */
+ private static final int MAX_DICTIONARY_ENTRIES = Integer.MAX_VALUE - 1;
+ private static final int MIN_INITIAL_SLAB_SIZE = 64;
+
+ /* encoding to label the data page */
+ private final Encoding encodingForDataPage;
+
+ /* encoding to label the dictionary page */
+ protected final Encoding encodingForDictionaryPage;
+
+ /* maximum size in bytes allowed for the dictionary will fail over to plain encoding if reached */
+ protected final int maxDictionaryByteSize;
+
+ /* will become true if the dictionary becomes too big */
+ protected boolean dictionaryTooBig;
+
+ /* current size in bytes the dictionary will take once serialized */
+ protected int dictionaryByteSize;
+
+ /* size in bytes of the dictionary at the end of last dictionary encoded page (in case the current page falls back to PLAIN) */
+ protected int lastUsedDictionaryByteSize;
+
+ /* size in items of the dictionary at the end of last dictionary encoded page (in case the current page falls back to PLAIN) */
+ protected int lastUsedDictionarySize;
+
+ /* dictionary encoded values */
+ protected IntList encodedValues = new IntList();
+
+ /**
+ * @param maxDictionaryByteSize
+ */
+ protected DictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+ this.maxDictionaryByteSize = maxDictionaryByteSize;
+ this.encodingForDataPage = encodingForDataPage;
+ this.encodingForDictionaryPage = encodingForDictionaryPage;
+ }
+
+ protected DictionaryPage dictPage(ValuesWriter dictionaryEncoder) {
+ return new DictionaryPage(dictionaryEncoder.getBytes(), lastUsedDictionarySize, encodingForDictionaryPage);
+ }
+
+ @Override
+ public boolean shouldFallBack() {
+ // if the dictionary reaches the max byte size or the values can not be encoded on 4 bytes anymore.
+ return dictionaryByteSize > maxDictionaryByteSize
+ || getDictionarySize() > MAX_DICTIONARY_ENTRIES;
+ }
+
+ @Override
+ public boolean isCompressionSatisfying(long rawSize, long encodedSize) {
+ return (encodedSize + dictionaryByteSize) < rawSize;
+ }
+
+ @Override
+ public void fallBackAllValuesTo(ValuesWriter writer) {
+ fallBackDictionaryEncodedData(writer);
+ if (lastUsedDictionarySize == 0) {
+ // if we never used the dictionary
+ // we free dictionary encoded data
+ clearDictionaryContent();
+ dictionaryByteSize = 0;
+ encodedValues = new IntList();
+ }
+ }
+
+ abstract protected void fallBackDictionaryEncodedData(ValuesWriter writer);
+
+ @Override
+ public long getBufferedSize() {
+ return encodedValues.size() * 4;
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ // size used in memory
+ return encodedValues.size() * 4 + dictionaryByteSize;
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ int maxDicId = getDictionarySize() - 1;
+ if (DEBUG) LOG.debug("max dic id " + maxDicId);
+ int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
+
+ int initialSlabSize =
+ CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10);
+
+ RunLengthBitPackingHybridEncoder encoder =
+ new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize);
+ IntIterator iterator = encodedValues.iterator();
+ try {
+ while (iterator.hasNext()) {
+ encoder.writeInt(iterator.next());
+ }
+ // encodes the bit width
+ byte[] bytesHeader = new byte[] { (byte) bitWidth };
+ BytesInput rleEncodedBytes = encoder.toBytes();
+ if (DEBUG) LOG.debug("rle encoded bytes " + rleEncodedBytes.size());
+ BytesInput bytes = concat(BytesInput.from(bytesHeader), rleEncodedBytes);
+ // remember size of dictionary when we last wrote a page
+ lastUsedDictionarySize = getDictionarySize();
+ lastUsedDictionaryByteSize = dictionaryByteSize;
+ return bytes;
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not encode the values", e);
+ }
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ return encodingForDataPage;
+ }
+
+ @Override
+ public void reset() {
+ encodedValues = new IntList();
+ }
+
+ @Override
+ public void resetDictionary() {
+ lastUsedDictionaryByteSize = 0;
+ lastUsedDictionarySize = 0;
+ dictionaryTooBig = false;
+ clearDictionaryContent();
+ }
+
+ /**
+ * clear/free the underlying dictionary content
+ */
+ protected abstract void clearDictionaryContent();
+
+ /**
+ * @return size in items
+ */
+ protected abstract int getDictionarySize();
+
+ @Override
+ public String memUsageString(String prefix) {
+ return String.format(
+ "%s DictionaryValuesWriter{\n"
+ + "%s\n"
+ + "%s\n"
+ + "%s}\n",
+ prefix,
+ prefix + " dict:" + dictionaryByteSize,
+ prefix + " values:" + String.valueOf(encodedValues.size() * 4),
+ prefix
+ );
+ }
+
+ /**
+ *
+ */
+ public static class PlainBinaryDictionaryValuesWriter extends DictionaryValuesWriter {
+
+ /* type specific dictionary content */
+ protected Object2IntMap<Binary> binaryDictionaryContent = new Object2IntLinkedOpenHashMap<Binary>();
+
+ /**
+ * @param maxDictionaryByteSize
+ */
+ public PlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+ super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+ binaryDictionaryContent.defaultReturnValue(-1);
+ }
+
+ @Override
+ public void writeBytes(Binary v) {
+ int id = binaryDictionaryContent.getInt(v);
+ if (id == -1) {
+ id = binaryDictionaryContent.size();
+ binaryDictionaryContent.put(copy(v), id);
+ // length as int (4 bytes) + actual bytes
+ dictionaryByteSize += 4 + v.length();
+ }
+ encodedValues.add(id);
+ }
+
+ @Override
+ public DictionaryPage createDictionaryPage() {
+ if (lastUsedDictionarySize > 0) {
+ // return a dictionary only if we actually used it
+ PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+ Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
+ // write only the part of the dict that we used
+ for (int i = 0; i < lastUsedDictionarySize; i++) {
+ Binary entry = binaryIterator.next();
+ dictionaryEncoder.writeBytes(entry);
+ }
+ return dictPage(dictionaryEncoder);
+ }
+ return null;
+ }
+
+ @Override
+ public int getDictionarySize() {
+ return binaryDictionaryContent.size();
+ }
+
+ @Override
+ protected void clearDictionaryContent() {
+ binaryDictionaryContent.clear();
+ }
+
+ @Override
+ public void fallBackDictionaryEncodedData(ValuesWriter writer) {
+ //build reverse dictionary
+ Binary[] reverseDictionary = new Binary[getDictionarySize()];
+ for (Object2IntMap.Entry<Binary> entry : binaryDictionaryContent.object2IntEntrySet()) {
+ reverseDictionary[entry.getIntValue()] = entry.getKey();
+ }
+
+ //fall back to plain encoding
+ IntIterator iterator = encodedValues.iterator();
+ while (iterator.hasNext()) {
+ int id = iterator.next();
+ writer.writeBytes(reverseDictionary[id]);
+ }
+ }
+
+ protected static Binary copy(Binary binary) {
+ return Binary.fromByteArray(
+ Arrays.copyOf(binary.getBytes(), binary.length()));
+ }
+ }
+
+ /**
+ *
+ */
+ public static class PlainFixedLenArrayDictionaryValuesWriter extends PlainBinaryDictionaryValuesWriter {
+
+ private final int length;
+
+ /**
+ * @param maxDictionaryByteSize
+ * @param initialSize
+ */
+ public PlainFixedLenArrayDictionaryValuesWriter(int maxDictionaryByteSize, int length, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+ super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+ this.length = length;
+ }
+
+ @Override
+ public void writeBytes(Binary value) {
+ int id = binaryDictionaryContent.getInt(value);
+ if (id == -1) {
+ id = binaryDictionaryContent.size();
+ binaryDictionaryContent.put(copy(value), id);
+ dictionaryByteSize += length;
+ }
+ encodedValues.add(id);
+ }
+
+ @Override
+ public DictionaryPage createDictionaryPage() {
+ if (lastUsedDictionarySize > 0) {
+ // return a dictionary only if we actually used it
+ FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize, maxDictionaryByteSize);
+ Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
+ // write only the part of the dict that we used
+ for (int i = 0; i < lastUsedDictionarySize; i++) {
+ Binary entry = binaryIterator.next();
+ dictionaryEncoder.writeBytes(entry);
+ }
+ return dictPage(dictionaryEncoder);
+ }
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ public static class PlainLongDictionaryValuesWriter extends DictionaryValuesWriter {
+
+ /* type specific dictionary content */
+ private Long2IntMap longDictionaryContent = new Long2IntLinkedOpenHashMap();
+
+ /**
+ * @param maxDictionaryByteSize
+ * @param initialSize
+ */
+ public PlainLongDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+ super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+ longDictionaryContent.defaultReturnValue(-1);
+ }
+
+ @Override
+ public void writeLong(long v) {
+ int id = longDictionaryContent.get(v);
+ if (id == -1) {
+ id = longDictionaryContent.size();
+ longDictionaryContent.put(v, id);
+ dictionaryByteSize += 8;
+ }
+ encodedValues.add(id);
+ }
+
+ @Override
+ public DictionaryPage createDictionaryPage() {
+ if (lastUsedDictionarySize > 0) {
+ // return a dictionary only if we actually used it
+ PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+ LongIterator longIterator = longDictionaryContent.keySet().iterator();
+ // write only the part of the dict that we used
+ for (int i = 0; i < lastUsedDictionarySize; i++) {
+ dictionaryEncoder.writeLong(longIterator.nextLong());
+ }
+ return dictPage(dictionaryEncoder);
+ }
+ return null;
+ }
+
+ @Override
+ public int getDictionarySize() {
+ return longDictionaryContent.size();
+ }
+
+ @Override
+ protected void clearDictionaryContent() {
+ longDictionaryContent.clear();
+ }
+
+ @Override
+ public void fallBackDictionaryEncodedData(ValuesWriter writer) {
+ //build reverse dictionary
+ long[] reverseDictionary = new long[getDictionarySize()];
+ ObjectIterator<Long2IntMap.Entry> entryIterator = longDictionaryContent.long2IntEntrySet().iterator();
+ while (entryIterator.hasNext()) {
+ Long2IntMap.Entry entry = entryIterator.next();
+ reverseDictionary[entry.getIntValue()] = entry.getLongKey();
+ }
+
+ //fall back to plain encoding
+ IntIterator iterator = encodedValues.iterator();
+ while (iterator.hasNext()) {
+ int id = iterator.next();
+ writer.writeLong(reverseDictionary[id]);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public static class PlainDoubleDictionaryValuesWriter extends DictionaryValuesWriter {
+
+ /* type specific dictionary content */
+ private Double2IntMap doubleDictionaryContent = new Double2IntLinkedOpenHashMap();
+
+ /**
+ * @param maxDictionaryByteSize
+ * @param initialSize
+ */
+ public PlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+ super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+ doubleDictionaryContent.defaultReturnValue(-1);
+ }
+
+ @Override
+ public void writeDouble(double v) {
+ int id = doubleDictionaryContent.get(v);
+ if (id == -1) {
+ id = doubleDictionaryContent.size();
+ doubleDictionaryContent.put(v, id);
+ dictionaryByteSize += 8;
+ }
+ encodedValues.add(id);
+ }
+
+ @Override
+ public DictionaryPage createDictionaryPage() {
+ if (lastUsedDictionarySize > 0) {
+ // return a dictionary only if we actually used it
+ PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+ DoubleIterator doubleIterator = doubleDictionaryContent.keySet().iterator();
+ // write only the part of the dict that we used
+ for (int i = 0; i < lastUsedDictionarySize; i++) {
+ dictionaryEncoder.writeDouble(doubleIterator.nextDouble());
+ }
+ return dictPage(dictionaryEncoder);
+ }
+ return null;
+ }
+
+ @Override
+ public int getDictionarySize() {
+ return doubleDictionaryContent.size();
+ }
+
+ @Override
+ protected void clearDictionaryContent() {
+ doubleDictionaryContent.clear();
+ }
+
+ @Override
+ public void fallBackDictionaryEncodedData(ValuesWriter writer) {
+ //build reverse dictionary
+ double[] reverseDictionary = new double[getDictionarySize()];
+ ObjectIterator<Double2IntMap.Entry> entryIterator = doubleDictionaryContent.double2IntEntrySet().iterator();
+ while (entryIterator.hasNext()) {
+ Double2IntMap.Entry entry = entryIterator.next();
+ reverseDictionary[entry.getIntValue()] = entry.getDoubleKey();
+ }
+
+ //fall back to plain encoding
+ IntIterator iterator = encodedValues.iterator();
+ while (iterator.hasNext()) {
+ int id = iterator.next();
+ writer.writeDouble(reverseDictionary[id]);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public static class PlainIntegerDictionaryValuesWriter extends DictionaryValuesWriter {
+
+ /* type specific dictionary content */
+ private Int2IntMap intDictionaryContent = new Int2IntLinkedOpenHashMap();
+
+ /**
+ * @param maxDictionaryByteSize
+ * @param initialSize
+ */
+ public PlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+ super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+ intDictionaryContent.defaultReturnValue(-1);
+ }
+
+ @Override
+ public void writeInteger(int v) {
+ int id = intDictionaryContent.get(v);
+ if (id == -1) {
+ id = intDictionaryContent.size();
+ intDictionaryContent.put(v, id);
+ dictionaryByteSize += 4;
+ }
+ encodedValues.add(id);
+ }
+
+ @Override
+ public DictionaryPage createDictionaryPage() {
+ if (lastUsedDictionarySize > 0) {
+ // return a dictionary only if we actually used it
+ PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+ it.unimi.dsi.fastutil.ints.IntIterator intIterator = intDictionaryContent.keySet().iterator();
+ // write only the part of the dict that we used
+ for (int i = 0; i < lastUsedDictionarySize; i++) {
+ dictionaryEncoder.writeInteger(intIterator.nextInt());
+ }
+ return dictPage(dictionaryEncoder);
+ }
+ return null;
+ }
+
+ @Override
+ public int getDictionarySize() {
+ return intDictionaryContent.size();
+ }
+
+ @Override
+ protected void clearDictionaryContent() {
+ intDictionaryContent.clear();
+ }
+
+ @Override
+ public void fallBackDictionaryEncodedData(ValuesWriter writer) {
+ //build reverse dictionary
+ int[] reverseDictionary = new int[getDictionarySize()];
+ ObjectIterator<Int2IntMap.Entry> entryIterator = intDictionaryContent.int2IntEntrySet().iterator();
+ while (entryIterator.hasNext()) {
+ Int2IntMap.Entry entry = entryIterator.next();
+ reverseDictionary[entry.getIntValue()] = entry.getIntKey();
+ }
+
+ //fall back to plain encoding
+ IntIterator iterator = encodedValues.iterator();
+ while (iterator.hasNext()) {
+ int id = iterator.next();
+ writer.writeInteger(reverseDictionary[id]);
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public static class PlainFloatDictionaryValuesWriter extends DictionaryValuesWriter {
+
+ /* type specific dictionary content */
+ private Float2IntMap floatDictionaryContent = new Float2IntLinkedOpenHashMap();
+
+ /**
+ * @param maxDictionaryByteSize
+ * @param initialSize
+ */
+ public PlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+ super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+ floatDictionaryContent.defaultReturnValue(-1);
+ }
+
+ @Override
+ public void writeFloat(float v) {
+ int id = floatDictionaryContent.get(v);
+ if (id == -1) {
+ id = floatDictionaryContent.size();
+ floatDictionaryContent.put(v, id);
+ dictionaryByteSize += 4;
+ }
+ encodedValues.add(id);
+ }
+
+ @Override
+ public DictionaryPage createDictionaryPage() {
+ if (lastUsedDictionarySize > 0) {
+ // return a dictionary only if we actually used it
+ PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+ FloatIterator floatIterator = floatDictionaryContent.keySet().iterator();
+ // write only the part of the dict that we used
+ for (int i = 0; i < lastUsedDictionarySize; i++) {
+ dictionaryEncoder.writeFloat(floatIterator.nextFloat());
+ }
+ return dictPage(dictionaryEncoder);
+ }
+ return null;
+ }
+
+ @Override
+ public int getDictionarySize() {
+ return floatDictionaryContent.size();
+ }
+
+ @Override
+ protected void clearDictionaryContent() {
+ floatDictionaryContent.clear();
+ }
+
+ @Override
+ public void fallBackDictionaryEncodedData(ValuesWriter writer) {
+ //build reverse dictionary
+ float[] reverseDictionary = new float[getDictionarySize()];
+ ObjectIterator<Float2IntMap.Entry> entryIterator = floatDictionaryContent.float2IntEntrySet().iterator();
+ while (entryIterator.hasNext()) {
+ Float2IntMap.Entry entry = entryIterator.next();
+ reverseDictionary[entry.getIntValue()] = entry.getFloatKey();
+ }
+
+ //fall back to plain encoding
+ IntIterator iterator = encodedValues.iterator();
+ while (iterator.hasNext()) {
+ int id = iterator.next();
+ writer.writeFloat(reverseDictionary[id]);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/IntList.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/IntList.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/IntList.java
new file mode 100644
index 0000000..3201072
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/IntList.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.dictionary;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An append-only integer list
+ * avoids autoboxing and buffer resizing
+ *
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class IntList {
+
+ private static final int SLAB_SIZE = 64 * 1024;
+
+ /**
+ * to iterate on the content of the list
+ * not an actual iterator to avoid autoboxing
+ *
+ * @author Julien Le Dem
+ *
+ */
+ public static class IntIterator {
+
+ private final int[][] slabs;
+ private int current;
+ private final int count;
+
+ /**
+ * slabs will be iterated in order up to the provided count
+ * as the last slab may not be full
+ * @param slabs contain the ints
+ * @param count total count of ints
+ */
+ public IntIterator(int[][] slabs, int count) {
+ this.slabs = slabs;
+ this.count = count;
+ }
+
+ /**
+ * @return wether there is a next value
+ */
+ public boolean hasNext() {
+ return current < count;
+ }
+
+ /**
+ * @return the next int
+ */
+ public int next() {
+ final int result = slabs[current / SLAB_SIZE][current % SLAB_SIZE];
+ ++ current;
+ return result;
+ }
+
+ }
+
+ private List<int[]> slabs = new ArrayList<int[]>();
+ private int[] currentSlab;
+ private int currentSlabPos;
+
+ /**
+ * construct an empty list
+ */
+ public IntList() {
+ initSlab();
+ }
+
+ private void initSlab() {
+ currentSlab = new int[SLAB_SIZE];
+ currentSlabPos = 0;
+ }
+
+ /**
+ * @param i value to append to the end of the list
+ */
+ public void add(int i) {
+ if (currentSlabPos == currentSlab.length) {
+ slabs.add(currentSlab);
+ initSlab();
+ }
+ currentSlab[currentSlabPos] = i;
+ ++ currentSlabPos;
+ }
+
+ /**
+ * (not an actual Iterable)
+ * @return an IntIterator on the content
+ */
+ public IntIterator iterator() {
+ int[][] itSlabs = slabs.toArray(new int[slabs.size() + 1][]);
+ itSlabs[slabs.size()] = currentSlab;
+ return new IntIterator(itSlabs, SLAB_SIZE * slabs.size() + currentSlabPos);
+ }
+
+ /**
+ * @return the current size of the list
+ */
+ public int size() {
+ return SLAB_SIZE * slabs.size() + currentSlabPos;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
new file mode 100644
index 0000000..055dd73
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.dictionary;
+
+import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian;
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+import static org.apache.parquet.column.Encoding.PLAIN;
+
+import java.io.IOException;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.FloatPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.IntegerPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.LongPlainValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * a simple implementation of dictionary for plain encoded values
+ *
+ */
+public abstract class PlainValuesDictionary extends Dictionary {
+
+ /**
+ * @param dictionaryPage the PLAIN encoded content of the dictionary
+ * @throws IOException
+ */
+ protected PlainValuesDictionary(DictionaryPage dictionaryPage) throws IOException {
+ super(dictionaryPage.getEncoding());
+ if (dictionaryPage.getEncoding() != PLAIN_DICTIONARY
+ && dictionaryPage.getEncoding() != PLAIN) {
+ throw new ParquetDecodingException("Dictionary data encoding type not supported: " + dictionaryPage.getEncoding());
+ }
+ }
+
+ /**
+ * a simple implementation of dictionary for plain encoded binary
+ */
+ public static class PlainBinaryDictionary extends PlainValuesDictionary {
+
+ private Binary[] binaryDictionaryContent = null;
+
+ /**
+ * Decodes {@link Binary} values from a {@link DictionaryPage}.
+ *
+ * Values are read as length-prefixed values with a 4-byte little-endian
+ * length.
+ *
+ * @param dictionaryPage a {@code DictionaryPage} of encoded binary values
+ * @throws IOException
+ */
+ public PlainBinaryDictionary(DictionaryPage dictionaryPage) throws IOException {
+ this(dictionaryPage, null);
+ }
+
+ /**
+ * Decodes {@link Binary} values from a {@link DictionaryPage}.
+ *
+ * If the given {@code length} is null, the values will be read as length-
+ * prefixed values with a 4-byte little-endian length. If length is not
+ * null, it will be used as the length for all fixed-length {@code Binary}
+ * values read from the page.
+ *
+ * @param dictionaryPage a {@code DictionaryPage} of encoded binary values
+ * @param length a fixed length of binary arrays, or null if not fixed
+ * @throws IOException
+ */
+ public PlainBinaryDictionary(DictionaryPage dictionaryPage, Integer length) throws IOException {
+ super(dictionaryPage);
+ final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+ binaryDictionaryContent = new Binary[dictionaryPage.getDictionarySize()];
+ int offset = 0;
+ if (length == null) {
+ // dictionary values are stored in order: size (4 bytes LE) followed by {size} bytes
+ for (int i = 0; i < binaryDictionaryContent.length; i++) {
+ int len = readIntLittleEndian(dictionaryBytes, offset);
+ // read the length
+ offset += 4;
+ // wrap the content in a binary
+ binaryDictionaryContent[i] = Binary.fromByteArray(dictionaryBytes, offset, len);
+ // increment to the next value
+ offset += len;
+ }
+ } else {
+ // dictionary values are stored as fixed-length arrays
+ Preconditions.checkArgument(length > 0,
+ "Invalid byte array length: " + length);
+ for (int i = 0; i < binaryDictionaryContent.length; i++) {
+ // wrap the content in a Binary
+ binaryDictionaryContent[i] = Binary.fromByteArray(
+ dictionaryBytes, offset, length);
+ // increment to the next value
+ offset += length;
+ }
+ }
+ }
+
+ @Override
+ public Binary decodeToBinary(int id) {
+ return binaryDictionaryContent[id];
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("PlainBinaryDictionary {\n");
+ for (int i = 0; i < binaryDictionaryContent.length; i++) {
+ sb.append(i).append(" => ").append(binaryDictionaryContent[i]).append("\n");
+ }
+ return sb.append("}").toString();
+ }
+
+ @Override
+ public int getMaxId() {
+ return binaryDictionaryContent.length - 1;
+ }
+
+ }
+
+ /**
+ * a simple implementation of dictionary for plain encoded long values
+ */
+ public static class PlainLongDictionary extends PlainValuesDictionary {
+
+ private long[] longDictionaryContent = null;
+
+ /**
+ * @param dictionaryPage
+ * @throws IOException
+ */
+ public PlainLongDictionary(DictionaryPage dictionaryPage) throws IOException {
+ super(dictionaryPage);
+ final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+ longDictionaryContent = new long[dictionaryPage.getDictionarySize()];
+ LongPlainValuesReader longReader = new LongPlainValuesReader();
+ longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+ for (int i = 0; i < longDictionaryContent.length; i++) {
+ longDictionaryContent[i] = longReader.readLong();
+ }
+ }
+
+ @Override
+ public long decodeToLong(int id) {
+ return longDictionaryContent[id];
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("PlainLongDictionary {\n");
+ for (int i = 0; i < longDictionaryContent.length; i++) {
+ sb.append(i).append(" => ").append(longDictionaryContent[i]).append("\n");
+ }
+ return sb.append("}").toString();
+ }
+
+ @Override
+ public int getMaxId() {
+ return longDictionaryContent.length - 1;
+ }
+
+ }
+
+ /**
+ * a simple implementation of dictionary for plain encoded double values
+ */
+ public static class PlainDoubleDictionary extends PlainValuesDictionary {
+
+ private double[] doubleDictionaryContent = null;
+
+ /**
+ * @param dictionaryPage
+ * @throws IOException
+ */
+ public PlainDoubleDictionary(DictionaryPage dictionaryPage) throws IOException {
+ super(dictionaryPage);
+ final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+ doubleDictionaryContent = new double[dictionaryPage.getDictionarySize()];
+ DoublePlainValuesReader doubleReader = new DoublePlainValuesReader();
+ doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+ for (int i = 0; i < doubleDictionaryContent.length; i++) {
+ doubleDictionaryContent[i] = doubleReader.readDouble();
+ }
+ }
+
+ @Override
+ public double decodeToDouble(int id) {
+ return doubleDictionaryContent[id];
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("PlainDoubleDictionary {\n");
+ for (int i = 0; i < doubleDictionaryContent.length; i++) {
+ sb.append(i).append(" => ").append(doubleDictionaryContent[i]).append("\n");
+ }
+ return sb.append("}").toString();
+ }
+
+ @Override
+ public int getMaxId() {
+ return doubleDictionaryContent.length - 1;
+ }
+
+ }
+
+ /**
+ * a simple implementation of dictionary for plain encoded integer values
+ */
+ public static class PlainIntegerDictionary extends PlainValuesDictionary {
+
+ private int[] intDictionaryContent = null;
+
+ /**
+ * @param dictionaryPage
+ * @throws IOException
+ */
+ public PlainIntegerDictionary(DictionaryPage dictionaryPage) throws IOException {
+ super(dictionaryPage);
+ final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+ intDictionaryContent = new int[dictionaryPage.getDictionarySize()];
+ IntegerPlainValuesReader intReader = new IntegerPlainValuesReader();
+ intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+ for (int i = 0; i < intDictionaryContent.length; i++) {
+ intDictionaryContent[i] = intReader.readInteger();
+ }
+ }
+
+ @Override
+ public int decodeToInt(int id) {
+ return intDictionaryContent[id];
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("PlainIntegerDictionary {\n");
+ for (int i = 0; i < intDictionaryContent.length; i++) {
+ sb.append(i).append(" => ").append(intDictionaryContent[i]).append("\n");
+ }
+ return sb.append("}").toString();
+ }
+
+ @Override
+ public int getMaxId() {
+ return intDictionaryContent.length - 1;
+ }
+
+ }
+
+ /**
+ * a simple implementation of dictionary for plain encoded float values
+ */
+ public static class PlainFloatDictionary extends PlainValuesDictionary {
+
+ private float[] floatDictionaryContent = null;
+
+ /**
+ * @param dictionaryPage
+ * @throws IOException
+ */
+ public PlainFloatDictionary(DictionaryPage dictionaryPage) throws IOException {
+ super(dictionaryPage);
+ final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+ floatDictionaryContent = new float[dictionaryPage.getDictionarySize()];
+ FloatPlainValuesReader floatReader = new FloatPlainValuesReader();
+ floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+ for (int i = 0; i < floatDictionaryContent.length; i++) {
+ floatDictionaryContent[i] = floatReader.readFloat();
+ }
+ }
+
+ @Override
+ public float decodeToFloat(int id) {
+ return floatDictionaryContent[id];
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("PlainFloatDictionary {\n");
+ for (int i = 0; i < floatDictionaryContent.length; i++) {
+ sb.append(i).append(" => ").append(floatDictionaryContent[i]).append("\n");
+ }
+ return sb.append("}").toString();
+ }
+
+ @Override
+ public int getMaxId() {
+ return floatDictionaryContent.length - 1;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
new file mode 100644
index 0000000..f66c7c9
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.fallback;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.values.RequiresFallback;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.api.Binary;
+
+public class FallbackValuesWriter<I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> extends ValuesWriter {
+
+ public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> FallbackValuesWriter<I, F> of(I initialWriter, F fallBackWriter) {
+ return new FallbackValuesWriter<I, F>(initialWriter, fallBackWriter);
+ }
+
+ /** writer to start with */
+ public final I initialWriter;
+ /** fallback */
+ public final F fallBackWriter;
+
+ private boolean fellBackAlready = false;
+
+ /** writer currently written to */
+ private ValuesWriter currentWriter;
+
+ private boolean initialUsedAndHadDictionary = false;
+
+ /* size of raw data, even if dictionary is used, it will not have effect on raw data size, it is used to decide
+ * if fall back to plain encoding is better by comparing rawDataByteSize with Encoded data size
+ * It's also used in getBufferedSize, so the page will be written based on raw data size
+ */
+ private long rawDataByteSize = 0;
+
+ /** indicates if this is the first page being processed */
+ private boolean firstPage = true;
+
+ public FallbackValuesWriter(I initialWriter, F fallBackWriter) {
+ super();
+ this.initialWriter = initialWriter;
+ this.fallBackWriter = fallBackWriter;
+ this.currentWriter = initialWriter;
+ }
+
+ @Override
+ public long getBufferedSize() {
+ // use raw data size to decide if we want to flush the page
+ // so the actual size of the page written could be much more smaller
+ // due to dictionary encoding. This prevents page being too big when fallback happens.
+ return rawDataByteSize;
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ if (!fellBackAlready && firstPage) {
+ // we use the first page to decide if we're going to use this encoding
+ BytesInput bytes = initialWriter.getBytes();
+ if (!initialWriter.isCompressionSatisfying(rawDataByteSize, bytes.size())) {
+ fallBack();
+ } else {
+ return bytes;
+ }
+ }
+ return currentWriter.getBytes();
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ Encoding encoding = currentWriter.getEncoding();
+ if (!fellBackAlready && !initialUsedAndHadDictionary) {
+ initialUsedAndHadDictionary = encoding.usesDictionary();
+ }
+ return encoding;
+ }
+
+ @Override
+ public void reset() {
+ rawDataByteSize = 0;
+ firstPage = false;
+ currentWriter.reset();
+ }
+
+ public DictionaryPage createDictionaryPage() {
+ if (initialUsedAndHadDictionary) {
+ return initialWriter.createDictionaryPage();
+ } else {
+ return currentWriter.createDictionaryPage();
+ }
+ }
+
+ public void resetDictionary() {
+ if (initialUsedAndHadDictionary) {
+ initialWriter.resetDictionary();
+ } else {
+ currentWriter.resetDictionary();
+ }
+ currentWriter = initialWriter;
+ fellBackAlready = false;
+ initialUsedAndHadDictionary = false;
+ firstPage = true;
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ return currentWriter.getAllocatedSize();
+ }
+
+ @Override
+ public String memUsageString(String prefix) {
+ return String.format(
+ "%s FallbackValuesWriter{\n"
+ + "%s\n"
+ + "%s\n"
+ + "%s}\n",
+ prefix,
+ initialWriter.memUsageString(prefix + " initial:"),
+ fallBackWriter.memUsageString(prefix + " fallback:"),
+ prefix
+ );
+ }
+
+ private void checkFallback() {
+ if (!fellBackAlready && initialWriter.shouldFallBack()) {
+ fallBack();
+ }
+ }
+
+ private void fallBack() {
+ fellBackAlready = true;
+ initialWriter.fallBackAllValuesTo(fallBackWriter);
+ currentWriter = fallBackWriter;
+ }
+
+ // passthrough writing the value
+
+ public void writeByte(int value) {
+ rawDataByteSize += 1;
+ currentWriter.writeByte(value);
+ checkFallback();
+ }
+
+ public void writeBytes(Binary v) {
+ //for rawdata, length(4 bytes int) is stored, followed by the binary content itself
+ rawDataByteSize += v.length() + 4;
+ currentWriter.writeBytes(v);
+ checkFallback();
+ }
+
+ public void writeInteger(int v) {
+ rawDataByteSize += 4;
+ currentWriter.writeInteger(v);
+ checkFallback();
+ }
+
+ public void writeLong(long v) {
+ rawDataByteSize += 8;
+ currentWriter.writeLong(v);
+ checkFallback();
+ }
+
+ public void writeFloat(float v) {
+ rawDataByteSize += 4;
+ currentWriter.writeFloat(v);
+ checkFallback();
+ }
+
+ public void writeDouble(double v) {
+ rawDataByteSize += 8;
+ currentWriter.writeDouble(v);
+ checkFallback();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
new file mode 100644
index 0000000..f567803
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.plain;
+
+import static org.apache.parquet.Log.DEBUG;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+
+public class BinaryPlainValuesReader extends ValuesReader {
+ private static final Log LOG = Log.getLog(BinaryPlainValuesReader.class);
+ private byte[] in;
+ private int offset;
+
+ @Override
+ public Binary readBytes() {
+ try {
+ int length = BytesUtils.readIntLittleEndian(in, offset);
+ int start = offset + 4;
+ offset = start + length;
+ return Binary.fromByteArray(in, start, length);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
+ } catch (RuntimeException e) {
+ throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
+ }
+ }
+
+ @Override
+ public void skip() {
+ try {
+ int length = BytesUtils.readIntLittleEndian(in, offset);
+ offset += 4 + length;
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not skip bytes at offset " + offset, e);
+ } catch (RuntimeException e) {
+ throw new ParquetDecodingException("could not skip bytes at offset " + offset, e);
+ }
+ }
+
+ @Override
+ public void initFromPage(int valueCount, byte[] in, int offset)
+ throws IOException {
+ if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+ this.in = in;
+ this.offset = offset;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
new file mode 100644
index 0000000..31e711f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.plain;
+
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
+
+/**
+ * encodes boolean for the plain encoding: one bit at a time (0 = false)
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class BooleanPlainValuesReader extends ValuesReader {
+ private static final Log LOG = Log.getLog(BooleanPlainValuesReader.class);
+
+ private ByteBitPackingValuesReader in = new ByteBitPackingValuesReader(1, LITTLE_ENDIAN);
+
+ /**
+ *
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.values.ValuesReader#readBoolean()
+ */
+ @Override
+ public boolean readBoolean() {
+ return in.readInteger() == 0 ? false : true;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.values.ValuesReader#skipBoolean()
+ */
+ @Override
+ public void skip() {
+ in.readInteger();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.values.ValuesReader#initFromPage(byte[], int)
+ */
+ @Override
+ public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
+ if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+ this.in.initFromPage(valueCount, in, offset);
+ }
+
+ @Override
+ public int getNextOffset() {
+ return this.in.getNextOffset();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
new file mode 100644
index 0000000..78920f0
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.plain;
+
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesWriter;
+
+
+/**
+ * An implementation of the PLAIN encoding
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class BooleanPlainValuesWriter extends ValuesWriter {
+
+ private ByteBitPackingValuesWriter bitPackingWriter;
+
+ public BooleanPlainValuesWriter() {
+ bitPackingWriter = new ByteBitPackingValuesWriter(1, LITTLE_ENDIAN);
+ }
+
+ @Override
+ public final void writeBoolean(boolean v) {
+ bitPackingWriter.writeInteger(v ? 1 : 0);
+ }
+
+ @Override
+ public long getBufferedSize() {
+ return bitPackingWriter.getBufferedSize();
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ return bitPackingWriter.getBytes();
+ }
+
+ @Override
+ public void reset() {
+ bitPackingWriter.reset();
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ return bitPackingWriter.getAllocatedSize();
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ return PLAIN;
+ }
+
+ @Override
+ public String memUsageString(String prefix) {
+ return bitPackingWriter.memUsageString(prefix);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
new file mode 100644
index 0000000..3a7d245
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.plain;
+
+import java.io.IOException;
+import org.apache.parquet.Log;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+
+import static org.apache.parquet.Log.DEBUG;
+
+/**
+ * ValuesReader for FIXED_LEN_BYTE_ARRAY.
+ *
+ * @author David Z. Chen <dc...@linkedin.com>
+ */
+public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
+ private static final Log LOG = Log.getLog(FixedLenByteArrayPlainValuesReader.class);
+ private byte[] in;
+ private int offset;
+ private int length;
+
+ public FixedLenByteArrayPlainValuesReader(int length) {
+ this.length = length;
+ }
+
+ @Override
+ public Binary readBytes() {
+ try {
+ int start = offset;
+ offset = start + length;
+ return Binary.fromByteArray(in, start, length);
+ } catch (RuntimeException e) {
+ throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
+ }
+ }
+
+ @Override
+ public void skip() {
+ offset += length;
+ }
+
+ @Override
+ public void initFromPage(int valueCount, byte[] in, int offset)
+ throws IOException {
+ if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+ this.in = in;
+ this.offset = offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
new file mode 100644
index 0000000..986ae0b
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.plain;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.bytes.LittleEndianDataOutputStream;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * ValuesWriter for FIXED_LEN_BYTE_ARRAY.
+ *
+ * @author David Z. Chen <dc...@linkedin.com>
+ */
+public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter {
+ private static final Log LOG = Log.getLog(PlainValuesWriter.class);
+
+ private CapacityByteArrayOutputStream arrayOut;
+ private LittleEndianDataOutputStream out;
+ private int length;
+
+ public FixedLenByteArrayPlainValuesWriter(int length, int initialSize, int pageSize) {
+ this.length = length;
+ this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
+ this.out = new LittleEndianDataOutputStream(arrayOut);
+ }
+
+ @Override
+ public final void writeBytes(Binary v) {
+ if (v.length() != length) {
+ throw new IllegalArgumentException("Fixed Binary size " + v.length() +
+ " does not match field type length " + length);
+ }
+ try {
+ v.writeTo(out);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write fixed bytes", e);
+ }
+ }
+
+ @Override
+ public long getBufferedSize() {
+ return arrayOut.size();
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ try {
+ out.flush();
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write page", e);
+ }
+ if (Log.DEBUG) LOG.debug("writing a buffer of size " + arrayOut.size());
+ return BytesInput.from(arrayOut);
+ }
+
+ @Override
+ public void reset() {
+ arrayOut.reset();
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ return arrayOut.getCapacity();
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ return Encoding.PLAIN;
+ }
+
+ @Override
+ public String memUsageString(String prefix) {
+ return arrayOut.memUsageString(prefix + " PLAIN");
+ }
+}