You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:38 UTC

[41/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
new file mode 100644
index 0000000..1b3692f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
@@ -0,0 +1,269 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetEncodingException;
+
+import java.io.IOException;
+
+/**
+ * Write integers with delta encoding and binary packing
+ * The format is as follows:
+ * <p/>
+ * <pre>
+ *   {@code
+ *     delta-binary-packing: <page-header> <block>*
+ *     page-header := <block size in values> <number of miniblocks in a block> <total value count> <first value>
+ *     block := <min delta> <list of bitwidths of miniblocks> <miniblocks>
+ *
+ *     min delta : zig-zag var int encoded
+ *     bitWidthsOfMiniBlock : 1 byte little endian
+ *     blockSizeInValues,blockSizeInValues,totalValueCount,firstValue : unsigned varint
+ *   }
+ * </pre>
+ *
+ * The algorithm and format is inspired by D. Lemire's paper: http://lemire.me/blog/archives/2012/09/12/fast-integer-compression-decoding-billions-of-integers-per-second/
+ *
+ * @author Tianshuo Deng
+ */
+public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
+  /**
+   * max bitwidth for a mini block, it is used to allocate miniBlockByteBuffer which is
+   * reused between flushes.
+   */
+  public static final int MAX_BITWIDTH = 32;
+
+  public static final int DEFAULT_NUM_BLOCK_VALUES = 128;
+
+  public static final int DEFAULT_NUM_MINIBLOCKS = 4;
+
+  private final CapacityByteArrayOutputStream baos;
+
+  /**
+   * stores blockSizeInValues, miniBlockNumInABlock and miniBlockSizeInValues
+   */
+  private final DeltaBinaryPackingConfig config;
+
+  /**
+   * bit width for each mini block, reused between flushes
+   */
+  private final int[] bitWidths;
+
+  private int totalValueCount = 0;
+
+  /**
+   * a pointer to deltaBlockBuffer indicating the end of deltaBlockBuffer
+   * the number of values in the deltaBlockBuffer that haven't flushed to baos
+   * it will be reset after each flush
+   */
+  private int deltaValuesToFlush = 0;
+
+  /**
+   * stores delta values starting from the 2nd value written(1st value is stored in header).
+   * It's reused between flushes
+   */
+  private int[] deltaBlockBuffer;
+
+  /**
+   * bytes buffer for a mini block, it is reused for each mini block.
+   * Therefore the size of biggest miniblock with bitwith of MAX_BITWITH is allocated
+   */
+  private byte[] miniBlockByteBuffer;
+
+  /**
+   * firstValue is written to the header of the page
+   */
+  private int firstValue = 0;
+
+  /**
+   * cache previous written value for calculating delta
+   */
+  private int previousValue = 0;
+
+  /**
+   * min delta is written to the beginning of each block.
+   * it's zig-zag encoded. The deltas stored in each block is actually the difference to min delta,
+   * therefore are all positive
+   * it will be reset after each flush
+   */
+  private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
+
+  public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize) {
+    this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize);
+  }
+
+  public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize) {
+    this.config = new DeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum);
+    bitWidths = new int[config.miniBlockNumInABlock];
+    deltaBlockBuffer = new int[blockSizeInValues];
+    miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
+    baos = new CapacityByteArrayOutputStream(slabSize, pageSize);
+  }
+
+  @Override
+  public long getBufferedSize() {
+    return baos.size();
+  }
+
+  @Override
+  public void writeInteger(int v) {
+    totalValueCount++;
+
+    if (totalValueCount == 1) {
+      firstValue = v;
+      previousValue = firstValue;
+      return;
+    }
+
+    int delta = v - previousValue;//calculate delta
+    previousValue = v;
+
+    deltaBlockBuffer[deltaValuesToFlush++] = delta;
+
+    if (delta < minDeltaInCurrentBlock) {
+      minDeltaInCurrentBlock = delta;
+    }
+
+    if (config.blockSizeInValues == deltaValuesToFlush) {
+      flushBlockBuffer();
+    }
+  }
+
+  private void flushBlockBuffer() {
+    //since we store the min delta, the deltas will be converted to be the difference to min delta and all positive
+    for (int i = 0; i < deltaValuesToFlush; i++) {
+      deltaBlockBuffer[i] = deltaBlockBuffer[i] - minDeltaInCurrentBlock;
+    }
+
+    writeMinDelta();
+    int miniBlocksToFlush = getMiniBlockCountToFlush(deltaValuesToFlush);
+
+    calculateBitWidthsForDeltaBlockBuffer(miniBlocksToFlush);
+    for (int i = 0; i < config.miniBlockNumInABlock; i++) {
+      writeBitWidthForMiniBlock(i);
+    }
+
+    for (int i = 0; i < miniBlocksToFlush; i++) {
+      //writing i th miniblock
+      int currentBitWidth = bitWidths[i];
+      BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(currentBitWidth);
+      int miniBlockStart = i * config.miniBlockSizeInValues;
+      for (int j = miniBlockStart; j < (i + 1) * config.miniBlockSizeInValues; j += 8) {//8 values per pack
+        // mini block is atomic in terms of flushing
+        // This may write more values when reach to the end of data writing to last mini block,
+        // since it may not be aligend to miniblock,
+        // but doesnt matter. The reader uses total count to see if reached the end.
+        packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, 0);
+        baos.write(miniBlockByteBuffer, 0, currentBitWidth);
+      }
+    }
+
+    minDeltaInCurrentBlock = Integer.MAX_VALUE;
+    deltaValuesToFlush = 0;
+  }
+
+  private void writeBitWidthForMiniBlock(int i) {
+    try {
+      BytesUtils.writeIntLittleEndianOnOneByte(baos, bitWidths[i]);
+    } catch (IOException e) {
+      throw new ParquetEncodingException("can not write bitwith for miniblock", e);
+    }
+  }
+
+  private void writeMinDelta() {
+    try {
+      BytesUtils.writeZigZagVarInt(minDeltaInCurrentBlock, baos);
+    } catch (IOException e) {
+      throw new ParquetEncodingException("can not write min delta for block", e);
+    }
+  }
+
+  /**
+   * iterate through values in each mini block and calculate the bitWidths of max values.
+   *
+   * @param miniBlocksToFlush
+   */
+  private void calculateBitWidthsForDeltaBlockBuffer(int miniBlocksToFlush) {
+    for (int miniBlockIndex = 0; miniBlockIndex < miniBlocksToFlush; miniBlockIndex++) {
+
+      int mask = 0;
+      int miniStart = miniBlockIndex * config.miniBlockSizeInValues;
+
+      //The end of current mini block could be the end of current block(deltaValuesToFlush) buffer when data is not aligned to mini block
+      int miniEnd = Math.min((miniBlockIndex + 1) * config.miniBlockSizeInValues, deltaValuesToFlush);
+
+      for (int i = miniStart; i < miniEnd; i++) {
+        mask |= deltaBlockBuffer[i];
+      }
+      bitWidths[miniBlockIndex] = 32 - Integer.numberOfLeadingZeros(mask);
+    }
+  }
+
+  private int getMiniBlockCountToFlush(double numberCount) {
+    return (int) Math.ceil(numberCount / config.miniBlockSizeInValues);
+  }
+
+  /**
+   * getBytes will trigger flushing block buffer, DO NOT write after getBytes() is called without calling reset()
+   *
+   * @return
+   */
+  @Override
+  public BytesInput getBytes() {
+    //The Page Header should include: blockSizeInValues, numberOfMiniBlocks, totalValueCount
+    if (deltaValuesToFlush != 0) {
+      flushBlockBuffer();
+    }
+    return BytesInput.concat(
+            config.toBytesInput(),
+            BytesInput.fromUnsignedVarInt(totalValueCount),
+            BytesInput.fromZigZagVarInt(firstValue),
+            BytesInput.from(baos));
+  }
+
+  @Override
+  public Encoding getEncoding() {
+    return Encoding.DELTA_BINARY_PACKED;
+  }
+
+  @Override
+  public void reset() {
+    this.totalValueCount = 0;
+    this.baos.reset();
+    this.deltaValuesToFlush = 0;
+    this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    return baos.getCapacity();
+  }
+
+  @Override
+  public String memUsageString(String prefix) {
+    return String.format("%s DeltaBinaryPacking %d bytes", prefix, getAllocatedSize());
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
new file mode 100644
index 0000000..2db1336
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
@@ -0,0 +1,70 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.deltalengthbytearray;
+
+import static org.apache.parquet.Log.DEBUG;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Reads binary data written by {@link DeltaLengthByteArrayValueWriter}
+ *
+ * @author Aniket Mokashi
+ *
+ */
+public class DeltaLengthByteArrayValuesReader extends ValuesReader {
+
+  private static final Log LOG = Log.getLog(DeltaLengthByteArrayValuesReader.class);
+  private ValuesReader lengthReader;
+  private byte[] in;
+  private int offset;
+
+  public DeltaLengthByteArrayValuesReader() {
+    this.lengthReader = new DeltaBinaryPackingValuesReader();
+  }
+
+  @Override
+  public void initFromPage(int valueCount, byte[] in, int offset)
+      throws IOException {
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+    lengthReader.initFromPage(valueCount, in, offset);
+    offset = lengthReader.getNextOffset();
+    this.in = in;
+    this.offset = offset;
+  }
+
+  @Override
+  public Binary readBytes() {
+    int length = lengthReader.readInteger();
+    int start = offset;
+    offset = start + length;
+    return Binary.fromByteArray(in, start, length);
+  }
+
+  @Override
+  public void skip() {
+    int length = lengthReader.readInteger();
+    offset = offset + length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
new file mode 100644
index 0000000..0a498b1
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
@@ -0,0 +1,106 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.deltalengthbytearray;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.bytes.LittleEndianDataOutputStream;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Write lengths of byte-arrays using delta encoding, followed by concatenated byte-arrays
+ * <pre>
+ *   {@code
+ *   delta-length-byte-array : length* byte-array*
+ *   }
+ * </pre>
+ * @author Aniket Mokashi
+ *
+ */
+public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
+
+  private static final Log LOG = Log.getLog(DeltaLengthByteArrayValuesWriter.class);
+
+  private ValuesWriter lengthWriter;
+  private CapacityByteArrayOutputStream arrayOut;
+  private LittleEndianDataOutputStream out;
+
+  public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize) {
+    arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
+    out = new LittleEndianDataOutputStream(arrayOut);
+    lengthWriter = new DeltaBinaryPackingValuesWriter(
+        DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES,
+        DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS,
+        initialSize, pageSize);
+  }
+
+  @Override
+  public void writeBytes(Binary v) {
+    try {
+      lengthWriter.writeInteger(v.length());
+      out.write(v.getBytes());
+    } catch (IOException e) {
+      throw new ParquetEncodingException("could not write bytes", e);
+    }
+  }
+
+  @Override
+  public long getBufferedSize() {
+    return lengthWriter.getBufferedSize() + arrayOut.size();
+  }
+
+  @Override
+  public BytesInput getBytes() {
+    try {
+      out.flush();
+    } catch (IOException e) {
+      throw new ParquetEncodingException("could not write page", e);
+    }
+    if (Log.DEBUG) LOG.debug("writing a buffer of size " + arrayOut.size());
+    return BytesInput.concat(lengthWriter.getBytes(), BytesInput.from(arrayOut));
+  }
+
+  @Override
+  public Encoding getEncoding() {
+    return Encoding.DELTA_LENGTH_BYTE_ARRAY;
+  }
+
+  @Override
+  public void reset() {
+    lengthWriter.reset();
+    arrayOut.reset();
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    return lengthWriter.getAllocatedSize() + arrayOut.getCapacity();
+  }
+
+  @Override
+  public String memUsageString(String prefix) {
+    return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY");
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
new file mode 100644
index 0000000..de3df02
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
@@ -0,0 +1,78 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.deltastrings;
+
+import java.io.IOException;
+
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Reads binary data written by {@link DeltaByteArrayWriter}
+ * 
+ * @author Aniket Mokashi
+ *
+ */
+public class DeltaByteArrayReader extends ValuesReader {
+  private ValuesReader prefixLengthReader;
+  private ValuesReader suffixReader;
+
+  private Binary previous;
+
+  public DeltaByteArrayReader() {
+    this.prefixLengthReader = new DeltaBinaryPackingValuesReader();
+    this.suffixReader = new DeltaLengthByteArrayValuesReader();
+    this.previous = Binary.fromByteArray(new byte[0]);
+  }
+
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset)
+      throws IOException {
+    prefixLengthReader.initFromPage(valueCount, page, offset);
+    int next = prefixLengthReader.getNextOffset();
+    suffixReader.initFromPage(valueCount, page, next);	
+  }
+
+  @Override
+  public void skip() {
+    prefixLengthReader.skip();
+    suffixReader.skip();
+  }
+
+  @Override
+  public Binary readBytes() {
+    int prefixLength = prefixLengthReader.readInteger();
+    // This does not copy bytes
+    Binary suffix = suffixReader.readBytes();
+    int length = prefixLength + suffix.length();
+    
+    // We have to do this to materialize the output
+    if(prefixLength != 0) {
+      byte[] out = new byte[length];
+      System.arraycopy(previous.getBytes(), 0, out, 0, prefixLength);
+      System.arraycopy(suffix.getBytes(), 0, out, prefixLength, suffix.length());
+      previous =  Binary.fromByteArray(out);
+    } else {
+      previous = suffix;
+    }
+    return previous;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
new file mode 100644
index 0000000..8c71d5d
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
@@ -0,0 +1,92 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.deltastrings;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Write prefix lengths using delta encoding, followed by suffixes with Delta length byte arrays
+ * <pre>
+ *   {@code
+ *   delta-length-byte-array : prefix-length* suffixes*
+ *   }
+ * </pre>
+ * @author Aniket Mokashi
+ *
+ */
+public class DeltaByteArrayWriter extends ValuesWriter{
+
+  private ValuesWriter prefixLengthWriter;
+  private ValuesWriter suffixWriter;
+  private byte[] previous;
+
+  public DeltaByteArrayWriter(int initialCapacity, int pageSize) {
+    this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity, pageSize);
+    this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity, pageSize);
+    this.previous = new byte[0];
+  }
+
+  @Override
+  public long getBufferedSize() {
+    return prefixLengthWriter.getBufferedSize() + suffixWriter.getBufferedSize();
+  }
+
+  @Override
+  public BytesInput getBytes() {
+    return BytesInput.concat(prefixLengthWriter.getBytes(), suffixWriter.getBytes());
+  }
+
+  @Override
+  public Encoding getEncoding() {
+    return Encoding.DELTA_BYTE_ARRAY;
+  }
+
+  @Override
+  public void reset() {
+    prefixLengthWriter.reset();
+    suffixWriter.reset();
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    return prefixLengthWriter.getAllocatedSize() + suffixWriter.getAllocatedSize();
+  }
+
+  @Override
+  public String memUsageString(String prefix) {
+    prefix = prefixLengthWriter.memUsageString(prefix);
+    return suffixWriter.memUsageString(prefix + "  DELTA_STRINGS");
+  }
+
+  @Override
+  public void writeBytes(Binary v) {
+    int i = 0;
+    byte[] vb = v.getBytes();
+    int length = previous.length < vb.length ? previous.length : vb.length;
+    for(i = 0; (i < length) && (previous[i] == vb[i]); i++);
+    prefixLengthWriter.writeInteger(i);
+    suffixWriter.writeBytes(Binary.fromByteArray(vb, i, vb.length - i));
+    previous = vb;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
new file mode 100644
index 0000000..972c87e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
@@ -0,0 +1,135 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.dictionary;
+
+import static org.apache.parquet.Log.DEBUG;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Reads values that have been dictionary encoded
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class DictionaryValuesReader extends ValuesReader {
+  private static final Log LOG = Log.getLog(DictionaryValuesReader.class);
+
+  private ByteArrayInputStream in;
+
+  private Dictionary dictionary;
+
+  private RunLengthBitPackingHybridDecoder decoder;
+
+  public DictionaryValuesReader(Dictionary dictionary) {
+    this.dictionary = dictionary;
+  }
+
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset)
+      throws IOException {
+    this.in = new ByteArrayInputStream(page, offset, page.length - offset);
+    if (page.length - offset > 0) {
+      if (DEBUG)
+        LOG.debug("init from page at offset " + offset + " for length " + (page.length - offset));
+      int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in);
+      if (DEBUG) LOG.debug("bit width " + bitWidth);
+      decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
+    } else {
+      decoder = new RunLengthBitPackingHybridDecoder(1, in) {
+        @Override
+        public int readInt() throws IOException {
+          throw new IOException("Attempt to read from empty page");
+        }
+      };
+    }
+  }
+
+  @Override
+  public int readValueDictionaryId() {
+    try {
+      return decoder.readInt();
+    } catch (IOException e) {
+      throw new ParquetDecodingException(e);
+    }
+  }
+
+  @Override
+  public Binary readBytes() {
+    try {
+      return dictionary.decodeToBinary(decoder.readInt());
+    } catch (IOException e) {
+      throw new ParquetDecodingException(e);
+    }
+  }
+
+  @Override
+  public float readFloat() {
+    try {
+      return dictionary.decodeToFloat(decoder.readInt());
+    } catch (IOException e) {
+      throw new ParquetDecodingException(e);
+    }
+  }
+
+  @Override
+  public double readDouble() {
+    try {
+      return dictionary.decodeToDouble(decoder.readInt());
+    } catch (IOException e) {
+      throw new ParquetDecodingException(e);
+    }
+  }
+
+  @Override
+  public int readInteger() {
+    try {
+      return dictionary.decodeToInt(decoder.readInt());
+    } catch (IOException e) {
+      throw new ParquetDecodingException(e);
+    }
+  }
+
+  @Override
+  public long readLong() {
+    try {
+      return dictionary.decodeToLong(decoder.readInt());
+    } catch (IOException e) {
+      throw new ParquetDecodingException(e);
+    }
+  }
+
+  @Override
+  public void skip() {
+    try {
+      decoder.readInt(); // Type does not matter as we are just skipping dictionary keys
+    } catch (IOException e) {
+      throw new ParquetDecodingException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
new file mode 100644
index 0000000..928c125
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
@@ -0,0 +1,625 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.dictionary;
+
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.bytes.BytesInput.concat;
+import it.unimi.dsi.fastutil.doubles.Double2IntLinkedOpenHashMap;
+import it.unimi.dsi.fastutil.doubles.Double2IntMap;
+import it.unimi.dsi.fastutil.doubles.DoubleIterator;
+import it.unimi.dsi.fastutil.floats.Float2IntLinkedOpenHashMap;
+import it.unimi.dsi.fastutil.floats.Float2IntMap;
+import it.unimi.dsi.fastutil.floats.FloatIterator;
+import it.unimi.dsi.fastutil.ints.Int2IntLinkedOpenHashMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2IntMap;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.values.RequiresFallback;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.dictionary.IntList.IntIterator;
+import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Will attempt to encode values using a dictionary and fall back to plain encoding
+ *  if the dictionary gets too big
+ *
+ * @author Julien Le Dem
+ *
+ */
+public abstract class DictionaryValuesWriter extends ValuesWriter implements RequiresFallback {
+  private static final Log LOG = Log.getLog(DictionaryValuesWriter.class);
+
+  /* max entries allowed for the dictionary will fail over to plain encoding if reached */
+  private static final int MAX_DICTIONARY_ENTRIES = Integer.MAX_VALUE - 1;
+  private static final int MIN_INITIAL_SLAB_SIZE = 64;
+
+  /* encoding to label the data page */
+  private final Encoding encodingForDataPage;
+
+  /* encoding to label the dictionary page */
+  protected final Encoding encodingForDictionaryPage;
+
+  /* maximum size in bytes allowed for the dictionary will fail over to plain encoding if reached */
+  protected final int maxDictionaryByteSize;
+
+  /* will become true if the dictionary becomes too big */
+  protected boolean dictionaryTooBig;
+
+  /* current size in bytes the dictionary will take once serialized */
+  protected int dictionaryByteSize;
+
+  /* size in bytes of the dictionary at the end of last dictionary encoded page (in case the current page falls back to PLAIN) */
+  protected int lastUsedDictionaryByteSize;
+
+  /* size in items of the dictionary at the end of last dictionary encoded page (in case the current page falls back to PLAIN) */
+  protected int lastUsedDictionarySize;
+
+  /* dictionary encoded values */
+  protected IntList encodedValues = new IntList();
+
+  /**
+   * @param maxDictionaryByteSize
+   */
+  protected DictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+    this.maxDictionaryByteSize = maxDictionaryByteSize;
+    this.encodingForDataPage = encodingForDataPage;
+    this.encodingForDictionaryPage = encodingForDictionaryPage;
+  }
+
+  protected DictionaryPage dictPage(ValuesWriter dictionaryEncoder) {
+    return new DictionaryPage(dictionaryEncoder.getBytes(), lastUsedDictionarySize, encodingForDictionaryPage);
+  }
+
+  @Override
+  public boolean shouldFallBack() {
+    // if the dictionary reaches the max byte size or the values can not be encoded on 4 bytes anymore.
+    return dictionaryByteSize > maxDictionaryByteSize
+        || getDictionarySize() > MAX_DICTIONARY_ENTRIES;
+  }
+
+  @Override
+  public boolean isCompressionSatisfying(long rawSize, long encodedSize) {
+    return (encodedSize + dictionaryByteSize) < rawSize;
+  }
+
+  @Override
+  public void fallBackAllValuesTo(ValuesWriter writer) {
+    fallBackDictionaryEncodedData(writer);
+    if (lastUsedDictionarySize == 0) {
+      // if we never used the dictionary
+      // we free dictionary encoded data
+      clearDictionaryContent();
+      dictionaryByteSize = 0;
+      encodedValues = new IntList();
+    }
+  }
+
+  abstract protected void fallBackDictionaryEncodedData(ValuesWriter writer);
+
+  @Override
+  public long getBufferedSize() {
+    return encodedValues.size() * 4;
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    // size used in memory
+    return encodedValues.size() * 4 + dictionaryByteSize;
+  }
+
+  @Override
+  public BytesInput getBytes() {
+    int maxDicId = getDictionarySize() - 1;
+    if (DEBUG) LOG.debug("max dic id " + maxDicId);
+    int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
+
+    int initialSlabSize =
+        CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10);
+
+    RunLengthBitPackingHybridEncoder encoder =
+        new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize);
+    IntIterator iterator = encodedValues.iterator();
+    try {
+      while (iterator.hasNext()) {
+        encoder.writeInt(iterator.next());
+      }
+      // encodes the bit width
+      byte[] bytesHeader = new byte[] { (byte) bitWidth };
+      BytesInput rleEncodedBytes = encoder.toBytes();
+      if (DEBUG) LOG.debug("rle encoded bytes " + rleEncodedBytes.size());
+      BytesInput bytes = concat(BytesInput.from(bytesHeader), rleEncodedBytes);
+      // remember size of dictionary when we last wrote a page
+      lastUsedDictionarySize = getDictionarySize();
+      lastUsedDictionaryByteSize = dictionaryByteSize;
+      return bytes;
+    } catch (IOException e) {
+      throw new ParquetEncodingException("could not encode the values", e);
+    }
+  }
+
+  @Override
+  public Encoding getEncoding() {
+    return encodingForDataPage;
+  }
+
+  @Override
+  public void reset() {
+    encodedValues = new IntList();
+  }
+
+  @Override
+  public void resetDictionary() {
+    lastUsedDictionaryByteSize = 0;
+    lastUsedDictionarySize = 0;
+    dictionaryTooBig = false;
+    clearDictionaryContent();
+  }
+
+  /**
+   * clear/free the underlying dictionary content
+   */
+  protected abstract void clearDictionaryContent();
+
+  /**
+   * @return size in items
+   */
+  protected abstract int getDictionarySize();
+
+  @Override
+  public String memUsageString(String prefix) {
+    return String.format(
+        "%s DictionaryValuesWriter{\n"
+          + "%s\n"
+          + "%s\n"
+        + "%s}\n",
+        prefix,
+        prefix + " dict:" + dictionaryByteSize,
+        prefix + " values:" + String.valueOf(encodedValues.size() * 4),
+        prefix
+        );
+  }
+
+  /**
+   *
+   */
+  public static class PlainBinaryDictionaryValuesWriter extends DictionaryValuesWriter {
+
+    /* type specific dictionary content */
+    protected Object2IntMap<Binary> binaryDictionaryContent = new Object2IntLinkedOpenHashMap<Binary>();
+
+    /**
+     * @param maxDictionaryByteSize
+     */
+    public PlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+      binaryDictionaryContent.defaultReturnValue(-1);
+    }
+
+    @Override
+    public void writeBytes(Binary v) {
+      int id = binaryDictionaryContent.getInt(v);
+      if (id == -1) {
+        id = binaryDictionaryContent.size();
+        binaryDictionaryContent.put(copy(v), id);
+        // length as int (4 bytes) + actual bytes
+        dictionaryByteSize += 4 + v.length();
+      }
+      encodedValues.add(id);
+    }
+
+    @Override
+    public DictionaryPage createDictionaryPage() {
+      if (lastUsedDictionarySize > 0) {
+        // return a dictionary only if we actually used it
+        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+        Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
+        // write only the part of the dict that we used
+        for (int i = 0; i < lastUsedDictionarySize; i++) {
+          Binary entry = binaryIterator.next();
+          dictionaryEncoder.writeBytes(entry);
+        }
+        return dictPage(dictionaryEncoder);
+      }
+      return null;
+    }
+
+    @Override
+    public int getDictionarySize() {
+      return binaryDictionaryContent.size();
+    }
+
+    @Override
+    protected void clearDictionaryContent() {
+      binaryDictionaryContent.clear();
+    }
+
+    @Override
+    public void fallBackDictionaryEncodedData(ValuesWriter writer) {
+      //build reverse dictionary
+      Binary[] reverseDictionary = new Binary[getDictionarySize()];
+      for (Object2IntMap.Entry<Binary> entry : binaryDictionaryContent.object2IntEntrySet()) {
+        reverseDictionary[entry.getIntValue()] = entry.getKey();
+      }
+
+      //fall back to plain encoding
+      IntIterator iterator = encodedValues.iterator();
+      while (iterator.hasNext()) {
+        int id = iterator.next();
+        writer.writeBytes(reverseDictionary[id]);
+      }
+    }
+
+    protected static Binary copy(Binary binary) {
+      return Binary.fromByteArray(
+          Arrays.copyOf(binary.getBytes(), binary.length()));
+    }
+  }
+
+  /**
+   *
+   */
+  public static class PlainFixedLenArrayDictionaryValuesWriter extends PlainBinaryDictionaryValuesWriter {
+
+    private final int length;
+
+    /**
+     * @param maxDictionaryByteSize
+     * @param initialSize
+     */
+    public PlainFixedLenArrayDictionaryValuesWriter(int maxDictionaryByteSize, int length, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+      this.length = length;
+    }
+
+    @Override
+    public void writeBytes(Binary value) {
+      int id = binaryDictionaryContent.getInt(value);
+      if (id == -1) {
+        id = binaryDictionaryContent.size();
+        binaryDictionaryContent.put(copy(value), id);
+        dictionaryByteSize += length;
+      }
+      encodedValues.add(id);
+    }
+
+    @Override
+    public DictionaryPage createDictionaryPage() {
+      if (lastUsedDictionarySize > 0) {
+        // return a dictionary only if we actually used it
+        FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize, maxDictionaryByteSize);
+        Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
+        // write only the part of the dict that we used
+        for (int i = 0; i < lastUsedDictionarySize; i++) {
+          Binary entry = binaryIterator.next();
+          dictionaryEncoder.writeBytes(entry);
+        }
+        return dictPage(dictionaryEncoder);
+      }
+      return null;
+    }
+  }
+
+  /**
+   *
+   */
+  public static class PlainLongDictionaryValuesWriter extends DictionaryValuesWriter {
+
+    /* type specific dictionary content */
+    private Long2IntMap longDictionaryContent = new Long2IntLinkedOpenHashMap();
+
+    /**
+     * @param maxDictionaryByteSize
+     * @param initialSize
+     */
+    public PlainLongDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+      longDictionaryContent.defaultReturnValue(-1);
+    }
+
+    @Override
+    public void writeLong(long v) {
+      int id = longDictionaryContent.get(v);
+      if (id == -1) {
+        id = longDictionaryContent.size();
+        longDictionaryContent.put(v, id);
+        dictionaryByteSize += 8;
+      }
+      encodedValues.add(id);
+    }
+
+    @Override
+    public DictionaryPage createDictionaryPage() {
+      if (lastUsedDictionarySize > 0) {
+        // return a dictionary only if we actually used it
+        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+        LongIterator longIterator = longDictionaryContent.keySet().iterator();
+        // write only the part of the dict that we used
+        for (int i = 0; i < lastUsedDictionarySize; i++) {
+          dictionaryEncoder.writeLong(longIterator.nextLong());
+        }
+        return dictPage(dictionaryEncoder);
+      }
+      return null;
+    }
+
+    @Override
+    public int getDictionarySize() {
+      return longDictionaryContent.size();
+    }
+
+    @Override
+    protected void clearDictionaryContent() {
+      longDictionaryContent.clear();
+    }
+
+    @Override
+    public void fallBackDictionaryEncodedData(ValuesWriter writer) {
+      //build reverse dictionary
+      long[] reverseDictionary = new long[getDictionarySize()];
+      ObjectIterator<Long2IntMap.Entry> entryIterator = longDictionaryContent.long2IntEntrySet().iterator();
+      while (entryIterator.hasNext()) {
+        Long2IntMap.Entry entry = entryIterator.next();
+        reverseDictionary[entry.getIntValue()] = entry.getLongKey();
+      }
+
+      //fall back to plain encoding
+      IntIterator iterator = encodedValues.iterator();
+      while (iterator.hasNext()) {
+        int id = iterator.next();
+        writer.writeLong(reverseDictionary[id]);
+      }
+    }
+  }
+
+  /**
+   *
+   */
+  public static class PlainDoubleDictionaryValuesWriter extends DictionaryValuesWriter {
+
+    /* type specific dictionary content */
+    private Double2IntMap doubleDictionaryContent = new Double2IntLinkedOpenHashMap();
+
+    /**
+     * @param maxDictionaryByteSize
+     * @param initialSize
+     */
+    public PlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+      doubleDictionaryContent.defaultReturnValue(-1);
+    }
+
+    @Override
+    public void writeDouble(double v) {
+      int id = doubleDictionaryContent.get(v);
+      if (id == -1) {
+        id = doubleDictionaryContent.size();
+        doubleDictionaryContent.put(v, id);
+        dictionaryByteSize += 8;
+      }
+      encodedValues.add(id);
+    }
+
+    @Override
+    public DictionaryPage createDictionaryPage() {
+      if (lastUsedDictionarySize > 0) {
+        // return a dictionary only if we actually used it
+        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+        DoubleIterator doubleIterator = doubleDictionaryContent.keySet().iterator();
+        // write only the part of the dict that we used
+        for (int i = 0; i < lastUsedDictionarySize; i++) {
+          dictionaryEncoder.writeDouble(doubleIterator.nextDouble());
+        }
+        return dictPage(dictionaryEncoder);
+      }
+      return null;
+    }
+
+    @Override
+    public int getDictionarySize() {
+      return doubleDictionaryContent.size();
+    }
+
+    @Override
+    protected void clearDictionaryContent() {
+      doubleDictionaryContent.clear();
+    }
+
+    @Override
+    public void fallBackDictionaryEncodedData(ValuesWriter writer) {
+      //build reverse dictionary
+      double[] reverseDictionary = new double[getDictionarySize()];
+      ObjectIterator<Double2IntMap.Entry> entryIterator = doubleDictionaryContent.double2IntEntrySet().iterator();
+      while (entryIterator.hasNext()) {
+        Double2IntMap.Entry entry = entryIterator.next();
+        reverseDictionary[entry.getIntValue()] = entry.getDoubleKey();
+      }
+
+      //fall back to plain encoding
+      IntIterator iterator = encodedValues.iterator();
+      while (iterator.hasNext()) {
+        int id = iterator.next();
+        writer.writeDouble(reverseDictionary[id]);
+      }
+    }
+  }
+
+  /**
+   *
+   */
+  public static class PlainIntegerDictionaryValuesWriter extends DictionaryValuesWriter {
+
+    /* type specific dictionary content */
+    private Int2IntMap intDictionaryContent = new Int2IntLinkedOpenHashMap();
+
+    /**
+     * @param maxDictionaryByteSize
+     * @param initialSize
+     */
+    public PlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+      intDictionaryContent.defaultReturnValue(-1);
+    }
+
+    @Override
+    public void writeInteger(int v) {
+      int id = intDictionaryContent.get(v);
+      if (id == -1) {
+        id = intDictionaryContent.size();
+        intDictionaryContent.put(v, id);
+        dictionaryByteSize += 4;
+      }
+      encodedValues.add(id);
+    }
+
+    @Override
+    public DictionaryPage createDictionaryPage() {
+      if (lastUsedDictionarySize > 0) {
+        // return a dictionary only if we actually used it
+        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+        it.unimi.dsi.fastutil.ints.IntIterator intIterator = intDictionaryContent.keySet().iterator();
+        // write only the part of the dict that we used
+        for (int i = 0; i < lastUsedDictionarySize; i++) {
+          dictionaryEncoder.writeInteger(intIterator.nextInt());
+        }
+        return dictPage(dictionaryEncoder);
+      }
+      return null;
+    }
+
+    @Override
+    public int getDictionarySize() {
+      return intDictionaryContent.size();
+    }
+
+    @Override
+    protected void clearDictionaryContent() {
+      intDictionaryContent.clear();
+    }
+
+    @Override
+    public void fallBackDictionaryEncodedData(ValuesWriter writer) {
+      //build reverse dictionary
+      int[] reverseDictionary = new int[getDictionarySize()];
+      ObjectIterator<Int2IntMap.Entry> entryIterator = intDictionaryContent.int2IntEntrySet().iterator();
+      while (entryIterator.hasNext()) {
+        Int2IntMap.Entry entry = entryIterator.next();
+        reverseDictionary[entry.getIntValue()] = entry.getIntKey();
+      }
+
+      //fall back to plain encoding
+      IntIterator iterator = encodedValues.iterator();
+      while (iterator.hasNext()) {
+        int id = iterator.next();
+        writer.writeInteger(reverseDictionary[id]);
+      }
+    }
+  }
+
+  /**
+   *
+   */
+  public static class PlainFloatDictionaryValuesWriter extends DictionaryValuesWriter {
+
+    /* type specific dictionary content */
+    private Float2IntMap floatDictionaryContent = new Float2IntLinkedOpenHashMap();
+
+    /**
+     * @param maxDictionaryByteSize
+     * @param initialSize
+     */
+    public PlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+      floatDictionaryContent.defaultReturnValue(-1);
+    }
+
+    @Override
+    public void writeFloat(float v) {
+      int id = floatDictionaryContent.get(v);
+      if (id == -1) {
+        id = floatDictionaryContent.size();
+        floatDictionaryContent.put(v, id);
+        dictionaryByteSize += 4;
+      }
+      encodedValues.add(id);
+    }
+
+    @Override
+    public DictionaryPage createDictionaryPage() {
+      if (lastUsedDictionarySize > 0) {
+        // return a dictionary only if we actually used it
+        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+        FloatIterator floatIterator = floatDictionaryContent.keySet().iterator();
+        // write only the part of the dict that we used
+        for (int i = 0; i < lastUsedDictionarySize; i++) {
+          dictionaryEncoder.writeFloat(floatIterator.nextFloat());
+        }
+        return dictPage(dictionaryEncoder);
+      }
+      return null;
+    }
+
+    @Override
+    public int getDictionarySize() {
+      return floatDictionaryContent.size();
+    }
+
+    @Override
+    protected void clearDictionaryContent() {
+      floatDictionaryContent.clear();
+    }
+
+    @Override
+    public void fallBackDictionaryEncodedData(ValuesWriter writer) {
+      //build reverse dictionary
+      float[] reverseDictionary = new float[getDictionarySize()];
+      ObjectIterator<Float2IntMap.Entry> entryIterator = floatDictionaryContent.float2IntEntrySet().iterator();
+      while (entryIterator.hasNext()) {
+        Float2IntMap.Entry entry = entryIterator.next();
+        reverseDictionary[entry.getIntValue()] = entry.getFloatKey();
+      }
+
+      //fall back to plain encoding
+      IntIterator iterator = encodedValues.iterator();
+      while (iterator.hasNext()) {
+        int id = iterator.next();
+        writer.writeFloat(reverseDictionary[id]);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/IntList.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/IntList.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/IntList.java
new file mode 100644
index 0000000..3201072
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/IntList.java
@@ -0,0 +1,123 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.dictionary;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An append-only integer list
+ * avoids autoboxing and buffer resizing
+ *
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class IntList {
+
+  private static final int SLAB_SIZE = 64 * 1024;
+
+  /**
+   * to iterate on the content of the list
+   * not an actual iterator to avoid autoboxing
+   *
+   * @author Julien Le Dem
+   *
+   */
+  public static class IntIterator {
+
+    private final int[][] slabs;
+    private int current;
+    private final int count;
+
+    /**
+     * slabs will be iterated in order up to the provided count
+     * as the last slab may not be full
+     * @param slabs contain the ints
+     * @param count total count of ints
+     */
+    public IntIterator(int[][] slabs, int count) {
+      this.slabs = slabs;
+      this.count = count;
+    }
+
+    /**
+     * @return wether there is a next value
+     */
+    public boolean hasNext() {
+      return current < count;
+    }
+
+    /**
+     * @return the next int
+     */
+    public int next() {
+      final int result = slabs[current / SLAB_SIZE][current % SLAB_SIZE];
+      ++ current;
+      return result;
+    }
+
+  }
+
+  private List<int[]> slabs = new ArrayList<int[]>();
+  private int[] currentSlab;
+  private int currentSlabPos;
+
+  /**
+   * construct an empty list
+   */
+  public IntList() {
+    initSlab();
+  }
+
+  private void initSlab() {
+    currentSlab = new int[SLAB_SIZE];
+    currentSlabPos = 0;
+  }
+
+  /**
+   * @param i value to append to the end of the list
+   */
+  public void add(int i) {
+    if (currentSlabPos == currentSlab.length) {
+      slabs.add(currentSlab);
+      initSlab();
+    }
+    currentSlab[currentSlabPos] = i;
+    ++ currentSlabPos;
+  }
+
+  /**
+   * (not an actual Iterable)
+   * @return an IntIterator on the content
+   */
+  public IntIterator iterator() {
+    int[][] itSlabs = slabs.toArray(new int[slabs.size() + 1][]);
+    itSlabs[slabs.size()] = currentSlab;
+    return new IntIterator(itSlabs, SLAB_SIZE * slabs.size() + currentSlabPos);
+  }
+
+  /**
+   * @return the current size of the list
+   */
+  public int size() {
+    return SLAB_SIZE * slabs.size() + currentSlabPos;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
new file mode 100644
index 0000000..055dd73
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
@@ -0,0 +1,310 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.dictionary;
+
+import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian;
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+import static org.apache.parquet.column.Encoding.PLAIN;
+
+import java.io.IOException;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.FloatPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.IntegerPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.LongPlainValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * a simple implementation of dictionary for plain encoded values
+ *
+ */
+public abstract class PlainValuesDictionary extends Dictionary {
+
+  /**
+   * @param dictionaryPage the PLAIN encoded content of the dictionary
+   * @throws IOException
+   */
+  protected PlainValuesDictionary(DictionaryPage dictionaryPage) throws IOException {
+    super(dictionaryPage.getEncoding());
+    if (dictionaryPage.getEncoding() != PLAIN_DICTIONARY
+        && dictionaryPage.getEncoding() != PLAIN) {
+      throw new ParquetDecodingException("Dictionary data encoding type not supported: " + dictionaryPage.getEncoding());
+    }
+  }
+
+  /**
+   * a simple implementation of dictionary for plain encoded binary
+   */
+  public static class PlainBinaryDictionary extends PlainValuesDictionary {
+
+    private Binary[] binaryDictionaryContent = null;
+
+    /**
+     * Decodes {@link Binary} values from a {@link DictionaryPage}.
+     *
+     * Values are read as length-prefixed values with a 4-byte little-endian
+     * length.
+     *
+     * @param dictionaryPage a {@code DictionaryPage} of encoded binary values
+     * @throws IOException
+     */
+    public PlainBinaryDictionary(DictionaryPage dictionaryPage) throws IOException {
+      this(dictionaryPage, null);
+    }
+
+    /**
+     * Decodes {@link Binary} values from a {@link DictionaryPage}.
+     *
+     * If the given {@code length} is null, the values will be read as length-
+     * prefixed values with a 4-byte little-endian length. If length is not
+     * null, it will be used as the length for all fixed-length {@code Binary}
+     * values read from the page.
+     *
+     * @param dictionaryPage a {@code DictionaryPage} of encoded binary values
+     * @param length a fixed length of binary arrays, or null if not fixed
+     * @throws IOException
+     */
+    public PlainBinaryDictionary(DictionaryPage dictionaryPage, Integer length) throws IOException {
+      super(dictionaryPage);
+      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      binaryDictionaryContent = new Binary[dictionaryPage.getDictionarySize()];
+      int offset = 0;
+      if (length == null) {
+        // dictionary values are stored in order: size (4 bytes LE) followed by {size} bytes
+        for (int i = 0; i < binaryDictionaryContent.length; i++) {
+          int len = readIntLittleEndian(dictionaryBytes, offset);
+          // read the length
+          offset += 4;
+          // wrap the content in a binary
+          binaryDictionaryContent[i] = Binary.fromByteArray(dictionaryBytes, offset, len);
+          // increment to the next value
+          offset += len;
+        }
+      } else {
+        // dictionary values are stored as fixed-length arrays
+        Preconditions.checkArgument(length > 0,
+            "Invalid byte array length: " + length);
+        for (int i = 0; i < binaryDictionaryContent.length; i++) {
+          // wrap the content in a Binary
+          binaryDictionaryContent[i] = Binary.fromByteArray(
+              dictionaryBytes, offset, length);
+          // increment to the next value
+          offset += length;
+        }
+      }
+    }
+
+    @Override
+    public Binary decodeToBinary(int id) {
+      return binaryDictionaryContent[id];
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("PlainBinaryDictionary {\n");
+      for (int i = 0; i < binaryDictionaryContent.length; i++) {
+        sb.append(i).append(" => ").append(binaryDictionaryContent[i]).append("\n");
+      }
+      return sb.append("}").toString();
+    }
+
+    @Override
+    public int getMaxId() {
+      return binaryDictionaryContent.length - 1;
+    }
+
+  }
+
+  /**
+   * a simple implementation of dictionary for plain encoded long values
+   */
+  public static class PlainLongDictionary extends PlainValuesDictionary {
+
+    private long[] longDictionaryContent = null;
+
+    /**
+     * @param dictionaryPage
+     * @throws IOException
+     */
+    public PlainLongDictionary(DictionaryPage dictionaryPage) throws IOException {
+      super(dictionaryPage);
+      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      longDictionaryContent = new long[dictionaryPage.getDictionarySize()];
+      LongPlainValuesReader longReader = new LongPlainValuesReader();
+      longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+      for (int i = 0; i < longDictionaryContent.length; i++) {
+        longDictionaryContent[i] = longReader.readLong();
+      }
+    }
+
+    @Override
+    public long decodeToLong(int id) {
+      return longDictionaryContent[id];
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("PlainLongDictionary {\n");
+      for (int i = 0; i < longDictionaryContent.length; i++) {
+        sb.append(i).append(" => ").append(longDictionaryContent[i]).append("\n");
+      }
+      return sb.append("}").toString();
+    }
+
+    @Override
+    public int getMaxId() {
+      return longDictionaryContent.length - 1;
+    }
+
+  }
+
+  /**
+   * a simple implementation of dictionary for plain encoded double values
+   */
+  public static class PlainDoubleDictionary extends PlainValuesDictionary {
+
+    private double[] doubleDictionaryContent = null;
+
+    /**
+     * @param dictionaryPage
+     * @throws IOException
+     */
+    public PlainDoubleDictionary(DictionaryPage dictionaryPage) throws IOException {
+      super(dictionaryPage);
+      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      doubleDictionaryContent = new double[dictionaryPage.getDictionarySize()];
+      DoublePlainValuesReader doubleReader = new DoublePlainValuesReader();
+      doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+      for (int i = 0; i < doubleDictionaryContent.length; i++) {
+        doubleDictionaryContent[i] = doubleReader.readDouble();
+      }
+    }
+
+    @Override
+    public double decodeToDouble(int id) {
+      return doubleDictionaryContent[id];
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("PlainDoubleDictionary {\n");
+      for (int i = 0; i < doubleDictionaryContent.length; i++) {
+        sb.append(i).append(" => ").append(doubleDictionaryContent[i]).append("\n");
+      }
+      return sb.append("}").toString();
+    }
+
+    @Override
+    public int getMaxId() {
+      return doubleDictionaryContent.length - 1;
+    }
+
+  }
+
+  /**
+   * a simple implementation of dictionary for plain encoded integer values
+   */
+  public static class PlainIntegerDictionary extends PlainValuesDictionary {
+
+    private int[] intDictionaryContent = null;
+
+    /**
+     * @param dictionaryPage
+     * @throws IOException
+     */
+    public PlainIntegerDictionary(DictionaryPage dictionaryPage) throws IOException {
+      super(dictionaryPage);
+      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      intDictionaryContent = new int[dictionaryPage.getDictionarySize()];
+      IntegerPlainValuesReader intReader = new IntegerPlainValuesReader();
+      intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+      for (int i = 0; i < intDictionaryContent.length; i++) {
+        intDictionaryContent[i] = intReader.readInteger();
+      }
+    }
+
+    @Override
+    public int decodeToInt(int id) {
+      return intDictionaryContent[id];
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("PlainIntegerDictionary {\n");
+      for (int i = 0; i < intDictionaryContent.length; i++) {
+        sb.append(i).append(" => ").append(intDictionaryContent[i]).append("\n");
+      }
+      return sb.append("}").toString();
+    }
+
+    @Override
+    public int getMaxId() {
+      return intDictionaryContent.length - 1;
+    }
+
+  }
+
+  /**
+   * a simple implementation of dictionary for plain encoded float values
+   */
+  public static class PlainFloatDictionary extends PlainValuesDictionary {
+
+    private float[] floatDictionaryContent = null;
+
+    /**
+     * @param dictionaryPage
+     * @throws IOException
+     */
+    public PlainFloatDictionary(DictionaryPage dictionaryPage) throws IOException {
+      super(dictionaryPage);
+      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      floatDictionaryContent = new float[dictionaryPage.getDictionarySize()];
+      FloatPlainValuesReader floatReader = new FloatPlainValuesReader();
+      floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+      for (int i = 0; i < floatDictionaryContent.length; i++) {
+        floatDictionaryContent[i] = floatReader.readFloat();
+      }
+    }
+
+    @Override
+    public float decodeToFloat(int id) {
+      return floatDictionaryContent[id];
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("PlainFloatDictionary {\n");
+      for (int i = 0; i < floatDictionaryContent.length; i++) {
+        sb.append(i).append(" => ").append(floatDictionaryContent[i]).append("\n");
+      }
+      return sb.append("}").toString();
+    }
+
+    @Override
+    public int getMaxId() {
+      return floatDictionaryContent.length - 1;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
new file mode 100644
index 0000000..f66c7c9
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
@@ -0,0 +1,190 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.fallback;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.values.RequiresFallback;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.api.Binary;
+
+public class FallbackValuesWriter<I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> extends ValuesWriter {
+
+  public static <I extends ValuesWriter & RequiresFallback, F extends ValuesWriter> FallbackValuesWriter<I, F> of(I initialWriter, F fallBackWriter) {
+    return new FallbackValuesWriter<I, F>(initialWriter, fallBackWriter);
+  }
+
+  /** writer to start with */
+  public final I initialWriter;
+  /** fallback */
+  public final F fallBackWriter;
+
+  private boolean fellBackAlready = false;
+
+  /** writer currently written to */
+  private ValuesWriter currentWriter;
+
+  private boolean initialUsedAndHadDictionary = false;
+
+  /* size of raw data, even if dictionary is used, it will not have effect on raw data size, it is used to decide
+   * if fall back to plain encoding is better by comparing rawDataByteSize with Encoded data size
+   * It's also used in getBufferedSize, so the page will be written based on raw data size
+   */
+  private long rawDataByteSize = 0;
+
+  /** indicates if this is the first page being processed */
+  private boolean firstPage = true;
+
+  public FallbackValuesWriter(I initialWriter, F fallBackWriter) {
+    super();
+    this.initialWriter = initialWriter;
+    this.fallBackWriter = fallBackWriter;
+    this.currentWriter = initialWriter;
+  }
+
+  @Override
+  public long getBufferedSize() {
+    // use raw data size to decide if we want to flush the page
+    // so the actual size of the page written could be much more smaller
+    // due to dictionary encoding. This prevents page being too big when fallback happens.
+    return rawDataByteSize;
+  }
+
+  @Override
+  public BytesInput getBytes() {
+    if (!fellBackAlready && firstPage) {
+      // we use the first page to decide if we're going to use this encoding
+      BytesInput bytes = initialWriter.getBytes();
+      if (!initialWriter.isCompressionSatisfying(rawDataByteSize, bytes.size())) {
+        fallBack();
+      } else {
+        return bytes;
+      }
+    }
+    return currentWriter.getBytes();
+  }
+
+  @Override
+  public Encoding getEncoding() {
+    Encoding encoding = currentWriter.getEncoding();
+    if (!fellBackAlready && !initialUsedAndHadDictionary) {
+      initialUsedAndHadDictionary = encoding.usesDictionary();
+    }
+    return encoding;
+  }
+
+  @Override
+  public void reset() {
+    rawDataByteSize = 0;
+    firstPage = false;
+    currentWriter.reset();
+  }
+
+  public DictionaryPage createDictionaryPage() {
+    if (initialUsedAndHadDictionary) {
+      return initialWriter.createDictionaryPage();
+    } else {
+      return currentWriter.createDictionaryPage();
+    }
+  }
+
+  public void resetDictionary() {
+    if (initialUsedAndHadDictionary) {
+      initialWriter.resetDictionary();
+    } else {
+      currentWriter.resetDictionary();
+    }
+    currentWriter = initialWriter;
+    fellBackAlready = false;
+    initialUsedAndHadDictionary = false;
+    firstPage = true;
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    return currentWriter.getAllocatedSize();
+  }
+
+  @Override
+  public String memUsageString(String prefix) {
+    return String.format(
+        "%s FallbackValuesWriter{\n"
+          + "%s\n"
+          + "%s\n"
+        + "%s}\n",
+        prefix,
+        initialWriter.memUsageString(prefix + " initial:"),
+        fallBackWriter.memUsageString(prefix + " fallback:"),
+        prefix
+        );
+  }
+
+  private void checkFallback() {
+    if (!fellBackAlready && initialWriter.shouldFallBack()) {
+      fallBack();
+    }
+  }
+
+  private void fallBack() {
+    fellBackAlready = true;
+    initialWriter.fallBackAllValuesTo(fallBackWriter);
+    currentWriter = fallBackWriter;
+  }
+
+  // passthrough writing the value
+
+  public void writeByte(int value) {
+    rawDataByteSize += 1;
+    currentWriter.writeByte(value);
+    checkFallback();
+  }
+
+  public void writeBytes(Binary v) {
+    //for rawdata, length(4 bytes int) is stored, followed by the binary content itself
+    rawDataByteSize += v.length() + 4;
+    currentWriter.writeBytes(v);
+    checkFallback();
+  }
+
+  public void writeInteger(int v) {
+    rawDataByteSize += 4;
+    currentWriter.writeInteger(v);
+    checkFallback();
+  }
+
+  public void writeLong(long v) {
+    rawDataByteSize += 8;
+    currentWriter.writeLong(v);
+    checkFallback();
+  }
+
+  public void writeFloat(float v) {
+    rawDataByteSize += 4;
+    currentWriter.writeFloat(v);
+    checkFallback();
+  }
+
+  public void writeDouble(double v) {
+    rawDataByteSize += 8;
+    currentWriter.writeDouble(v);
+    checkFallback();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
new file mode 100644
index 0000000..f567803
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
@@ -0,0 +1,70 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.plain;
+
+import static org.apache.parquet.Log.DEBUG;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+
+public class BinaryPlainValuesReader extends ValuesReader {
+  private static final Log LOG = Log.getLog(BinaryPlainValuesReader.class);
+  private byte[] in;
+  private int offset;
+
+  @Override
+  public Binary readBytes() {
+    try {
+      int length = BytesUtils.readIntLittleEndian(in, offset);
+      int start = offset + 4;
+      offset = start + length;
+      return Binary.fromByteArray(in, start, length);
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
+    } catch (RuntimeException e) {
+      throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
+    }
+  }
+
+  @Override
+  public void skip() {
+    try {
+      int length = BytesUtils.readIntLittleEndian(in, offset);
+      offset += 4 + length;
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not skip bytes at offset " + offset, e);
+    } catch (RuntimeException e) {
+      throw new ParquetDecodingException("could not skip bytes at offset " + offset, e);
+    }
+  }
+
+  @Override
+  public void initFromPage(int valueCount, byte[] in, int offset)
+      throws IOException {
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+    this.in = in;
+    this.offset = offset;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
new file mode 100644
index 0000000..31e711f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
@@ -0,0 +1,75 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.plain;
+
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
+
+/**
+ * encodes boolean for the plain encoding: one bit at a time (0 = false)
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class BooleanPlainValuesReader extends ValuesReader {
+  private static final Log LOG = Log.getLog(BooleanPlainValuesReader.class);
+
+  private ByteBitPackingValuesReader in = new ByteBitPackingValuesReader(1, LITTLE_ENDIAN);
+
+  /**
+   *
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.values.ValuesReader#readBoolean()
+   */
+  @Override
+  public boolean readBoolean() {
+    return in.readInteger() == 0 ? false : true;
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.values.ValuesReader#skipBoolean()
+   */
+  @Override
+  public void skip() {
+    in.readInteger();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @see org.apache.parquet.column.values.ValuesReader#initFromPage(byte[], int)
+   */
+  @Override
+  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+    this.in.initFromPage(valueCount, in, offset);
+  }
+  
+  @Override
+  public int getNextOffset() {
+    return this.in.getNextOffset();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
new file mode 100644
index 0000000..78920f0
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
@@ -0,0 +1,78 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.plain;
+
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesWriter;
+
+
+/**
+ * An implementation of the PLAIN encoding
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class BooleanPlainValuesWriter extends ValuesWriter {
+
+  private ByteBitPackingValuesWriter bitPackingWriter;
+
+  public BooleanPlainValuesWriter() {
+    bitPackingWriter = new ByteBitPackingValuesWriter(1, LITTLE_ENDIAN);
+  }
+
+  @Override
+  public final void writeBoolean(boolean v) {
+    bitPackingWriter.writeInteger(v ? 1 : 0);
+  }
+
+  @Override
+  public long getBufferedSize() {
+    return bitPackingWriter.getBufferedSize();
+  }
+
+  @Override
+  public BytesInput getBytes() {
+    return bitPackingWriter.getBytes();
+  }
+
+  @Override
+  public void reset() {
+    bitPackingWriter.reset();
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    return bitPackingWriter.getAllocatedSize();
+  }
+
+  @Override
+  public Encoding getEncoding() {
+    return PLAIN;
+  }
+
+  @Override
+  public String memUsageString(String prefix) {
+    return bitPackingWriter.memUsageString(prefix);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
new file mode 100644
index 0000000..3a7d245
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -0,0 +1,67 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.plain;
+
+import java.io.IOException;
+import org.apache.parquet.Log;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+
+import static org.apache.parquet.Log.DEBUG;
+
+/**
+ * ValuesReader for FIXED_LEN_BYTE_ARRAY.
+ *
+ * @author David Z. Chen <dc...@linkedin.com>
+ */
+public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
+  private static final Log LOG = Log.getLog(FixedLenByteArrayPlainValuesReader.class);
+  private byte[] in;
+  private int offset;
+  private int length;
+
+  public FixedLenByteArrayPlainValuesReader(int length) {
+    this.length = length;
+  }
+
+  @Override
+  public Binary readBytes() {
+    try {
+      int start = offset;
+      offset = start + length;
+      return Binary.fromByteArray(in, start, length);
+    } catch (RuntimeException e) {
+      throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
+    }
+  }
+
+  @Override
+  public void skip() {
+    offset += length;
+  }
+
+  @Override
+  public void initFromPage(int valueCount, byte[] in, int offset)
+      throws IOException {
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+    this.in = in;
+    this.offset = offset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
new file mode 100644
index 0000000..986ae0b
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
@@ -0,0 +1,98 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.plain;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.bytes.LittleEndianDataOutputStream;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * ValuesWriter for FIXED_LEN_BYTE_ARRAY.
+ *
+ * @author David Z. Chen <dc...@linkedin.com>
+ */
+public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter {
+  private static final Log LOG = Log.getLog(PlainValuesWriter.class);
+
+  private CapacityByteArrayOutputStream arrayOut;
+  private LittleEndianDataOutputStream out;
+  private int length;
+
+  public FixedLenByteArrayPlainValuesWriter(int length, int initialSize, int pageSize) {
+    this.length = length;
+    this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
+    this.out = new LittleEndianDataOutputStream(arrayOut);
+  }
+
+  @Override
+  public final void writeBytes(Binary v) {
+    if (v.length() != length) {
+      throw new IllegalArgumentException("Fixed Binary size " + v.length() +
+          " does not match field type length " + length);
+    }
+    try {
+      v.writeTo(out);
+    } catch (IOException e) {
+      throw new ParquetEncodingException("could not write fixed bytes", e);
+    }
+  }
+
+  @Override
+  public long getBufferedSize() {
+    return arrayOut.size();
+  }
+
+  @Override
+  public BytesInput getBytes() {
+    try {
+      out.flush();
+    } catch (IOException e) {
+      throw new ParquetEncodingException("could not write page", e);
+    }
+    if (Log.DEBUG) LOG.debug("writing a buffer of size " + arrayOut.size());
+    return BytesInput.from(arrayOut);
+  }
+
+  @Override
+  public void reset() {
+    arrayOut.reset();
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    return arrayOut.getCapacity();
+  }
+
+  @Override
+  public Encoding getEncoding() {
+    return Encoding.PLAIN;
+  }
+
+  @Override
+  public String memUsageString(String prefix) {
+    return arrayOut.memUsageString(prefix + " PLAIN");
+  }
+}