You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/04/21 20:43:11 UTC

[1/2] parquet-mr git commit: PARQUET-225: Add support for INT64 delta encoding.

Repository: parquet-mr
Updated Branches:
  refs/heads/master 741944332 -> 8bcfe6c55


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
index 8df5f39..b7dc26b 100644
--- a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
+++ b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
@@ -22,10 +22,10 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Random;
 
 import org.junit.Assert;
 import org.junit.Test;
-
 import org.apache.parquet.Log;
 import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingReader;
 import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
@@ -46,6 +46,24 @@ public class TestByteBitPacking {
       Assert.assertArrayEquals("width "+i, values, unpacked);
     }
   }
+  
+  @Test
+  public void testPackUnPackLong() {
+    LOG.debug("");
+    LOG.debug("testPackUnPackLong");
+    for (int i = 1; i < 64; i++) {
+      LOG.debug("Width: " + i);
+      long[] unpacked32 = new long[32];
+      long[] unpacked8 = new long[32];
+      long[] values = generateValuesLong(i);
+      packUnpack32(Packer.BIG_ENDIAN.newBytePackerForLong(i), values, unpacked32);
+      LOG.debug("Output 32: " + TestBitPacking.toString(unpacked32));
+      Assert.assertArrayEquals("width "+i, values, unpacked32);
+      packUnpack8(Packer.BIG_ENDIAN.newBytePackerForLong(i), values, unpacked8);
+      LOG.debug("Output 8: " + TestBitPacking.toString(unpacked8));
+      Assert.assertArrayEquals("width "+i, values, unpacked8);
+    }
+  }
 
   private void packUnpack(BytePacker packer, int[] values, int[] unpacked) {
     byte[] packed = new byte[packer.getBitWidth() * 4];
@@ -54,6 +72,24 @@ public class TestByteBitPacking {
     packer.unpack32Values(ByteBuffer.wrap(packed), 0, unpacked, 0);
   }
 
+  private void packUnpack32(BytePackerForLong packer, long[] values, long[] unpacked) {
+    byte[] packed = new byte[packer.getBitWidth() * 4];
+    packer.pack32Values(values, 0, packed, 0);
+    LOG.debug("packed: " + TestBitPacking.toString(packed));
+    packer.unpack32Values(packed, 0, unpacked, 0);
+  }
+
+  private void packUnpack8(BytePackerForLong packer, long[] values, long[] unpacked) {
+    byte[] packed = new byte[packer.getBitWidth() * 4];
+    for (int i = 0; i < 4; i++) {
+      packer.pack8Values(values,  8 * i, packed, packer.getBitWidth() * i);
+    }
+    LOG.debug("packed: " + TestBitPacking.toString(packed));
+    for (int i = 0; i < 4; i++) {
+      packer.unpack8Values(packed, packer.getBitWidth() * i, unpacked, 8 * i);
+    }
+  }
+
   private int[] generateValues(int bitWidth) {
     int[] values = new int[32];
     for (int j = 0; j < values.length; j++) {
@@ -63,6 +99,16 @@ public class TestByteBitPacking {
     return values;
   }
 
+  private long[] generateValuesLong(int bitWidth) {
+    long[] values = new long[32];
+    Random random = new Random(0);
+    for (int j = 0; j < values.length; j++) {
+      values[j] = random.nextLong() & ((1l << bitWidth) - 1l);
+    }
+    LOG.debug("Input:  " + TestBitPacking.toString(values));
+    return values;
+  }
+
   @Test
   public void testPackUnPackAgainstHandWritten() throws IOException {
     LOG.debug("");

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
----------------------------------------------------------------------
diff --git a/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java b/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
index 3d182e2..b4868e9 100644
--- a/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
+++ b/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
@@ -27,23 +27,40 @@ import java.io.IOException;
  * This class generates bit packers that pack the most significant bit first.
  * The result of the generation is checked in. To regenerate the code run this class and check in the result.
  *
- * TODO: remove the unnecessary masks for perf
- *
  * @author Julien Le Dem
  *
  */
 public class ByteBasedBitPackingGenerator {
 
-  private static final String CLASS_NAME_PREFIX = "ByteBitPacking";
-  private static final int PACKER_COUNT = 32;
+  private static final String CLASS_NAME_PREFIX_FOR_INT = "ByteBitPacking";
+  private static final String CLASS_NAME_PREFIX_FOR_LONG = "ByteBitPackingForLong";
+  private static final String VARIABLE_TYPE_FOR_INT = "int";
+  private static final String VARIABLE_TYPE_FOR_LONG = "long";
+  private static final int MAX_BITS_FOR_INT = 32;
+  private static final int MAX_BITS_FOR_LONG = 64;
 
   public static void main(String[] args) throws Exception {
     String basePath = args[0];
-    generateScheme(CLASS_NAME_PREFIX + "BE", true, basePath);
-    generateScheme(CLASS_NAME_PREFIX + "LE", false, basePath);
+    // Int for Big Endian
+    generateScheme(false, true, basePath);
+
+    // Int for Little Endian
+    generateScheme(false, false, basePath);
+
+    // Long for Big Endian
+    generateScheme(true, true, basePath);
+
+    // Long for Little Endian
+    generateScheme(true, false, basePath);
   }
 
-  private static void generateScheme(String className, boolean msbFirst, String basePath) throws IOException {
+  private static void generateScheme(boolean isLong, boolean msbFirst, 
+      String basePath) throws IOException {
+    String baseClassName = isLong ? CLASS_NAME_PREFIX_FOR_LONG : CLASS_NAME_PREFIX_FOR_INT;
+    String className = msbFirst ? (baseClassName + "BE") : (baseClassName + "LE");
+    int maxBits = isLong ? MAX_BITS_FOR_LONG : MAX_BITS_FOR_INT;
+    String nameSuffix = isLong ? "ForLong" : "";
+    
     final File file = new File(basePath + "/org/apache/parquet/column/values/bitpacking/" + className + ".java").getAbsoluteFile();
     if (!file.getParentFile().exists()) {
       file.getParentFile().mkdirs();
@@ -65,48 +82,58 @@ public class ByteBasedBitPackingGenerator {
     fw.append(" */\n");
     fw.append("public abstract class " + className + " {\n");
     fw.append("\n");
-    fw.append("  private static final BytePacker[] packers = new BytePacker[33];\n");
+    fw.append("  private static final BytePacker" + nameSuffix + "[] packers = new BytePacker" + nameSuffix + "[" + (maxBits + 1) + "];\n");
     fw.append("  static {\n");
-    for (int i = 0; i <= PACKER_COUNT; i++) {
+    for (int i = 0; i <= maxBits; i++) {
       fw.append("    packers[" + i + "] = new Packer" + i + "();\n");
     }
     fw.append("  }\n");
     fw.append("\n");
-    fw.append("  public static final BytePackerFactory factory = new BytePackerFactory() {\n");
-    fw.append("    public BytePacker newBytePacker(int bitWidth) {\n");
+    fw.append("  public static final BytePacker" + nameSuffix + "Factory factory = new BytePacker" + nameSuffix + "Factory() {\n");
+    fw.append("    public BytePacker" + nameSuffix + " newBytePacker" + nameSuffix + "(int bitWidth) {\n");
     fw.append("      return packers[bitWidth];\n");
     fw.append("    }\n");
     fw.append("  };\n");
     fw.append("\n");
-    for (int i = 0; i <= PACKER_COUNT; i++) {
-      generateClass(fw, i, msbFirst);
+    for (int i = 0; i <= maxBits; i++) {
+      generateClass(fw, i, isLong, msbFirst);
       fw.append("\n");
     }
     fw.append("}\n");
     fw.close();
   }
 
-  private static void generateClass(FileWriter fw, int bitWidth, boolean msbFirst) throws IOException {
-    fw.append("  private static final class Packer" + bitWidth + " extends BytePacker {\n");
+  private static void generateClass(FileWriter fw, int bitWidth, boolean isLong, boolean msbFirst) throws IOException {
+    String nameSuffix = isLong ? "ForLong" : "";
+    fw.append("  private static final class Packer" + bitWidth + " extends BytePacker" + nameSuffix + " {\n");
     fw.append("\n");
     fw.append("    private Packer" + bitWidth + "() {\n");
     fw.append("      super("+bitWidth+");\n");
     fw.append("    }\n");
     fw.append("\n");
     // Packing
-    generatePack(fw, bitWidth, 1, msbFirst);
-    generatePack(fw, bitWidth, 4, msbFirst);
+    generatePack(fw, bitWidth, 1, isLong, msbFirst);
+    generatePack(fw, bitWidth, 4, isLong, msbFirst);
 
     // Unpacking
-    generateUnpack(fw, bitWidth, 1, msbFirst, true);
-    generateUnpack(fw, bitWidth, 1, msbFirst, false);
-    generateUnpack(fw, bitWidth, 4, msbFirst, true);
-    generateUnpack(fw, bitWidth, 4, msbFirst, false);
+    generateUnpack(fw, bitWidth, 1, isLong, msbFirst, true);
+    generateUnpack(fw, bitWidth, 1, isLong, msbFirst, false);
+    generateUnpack(fw, bitWidth, 4, isLong, msbFirst, true);
+    generateUnpack(fw, bitWidth, 4, isLong, msbFirst, false);
 
     fw.append("  }\n");
   }
-
-  private static int getShift(FileWriter fw, int bitWidth, boolean msbFirst,
+  
+  private static class ShiftMask {
+    ShiftMask(int shift, long mask) {
+      this.shift = shift;
+      this.mask = mask;
+    }
+    public int shift;
+    public long mask;
+  }
+  
+  private static ShiftMask getShift(FileWriter fw, int bitWidth, boolean isLong, boolean msbFirst,
       int byteIndex, int valueIndex) throws IOException {
     // relative positions of the start and end of the value to the start and end of the byte
     int valueStartBitIndex = (valueIndex * bitWidth) - (8 * (byteIndex));
@@ -120,6 +147,7 @@ public class ByteBasedBitPackingGenerator {
     int byteEndBitWanted;
 
     int shift;
+    int widthWanted;
 
     if (msbFirst) {
       valueStartBitWanted = valueStartBitIndex < 0 ? bitWidth - 1 + valueStartBitIndex : bitWidth - 1;
@@ -127,13 +155,17 @@ public class ByteBasedBitPackingGenerator {
       byteStartBitWanted = valueStartBitIndex < 0 ? 8 : 7 - valueStartBitIndex;
       byteEndBitWanted = valueEndBitIndex > 0 ? 0 : -valueEndBitIndex;
       shift = valueEndBitWanted - byteEndBitWanted;
+      widthWanted = Math.min(7, byteStartBitWanted) - Math.min(7, byteEndBitWanted) + 1;
     } else {
       valueStartBitWanted = bitWidth - 1 - (valueEndBitIndex > 0 ? valueEndBitIndex : 0);
       valueEndBitWanted = bitWidth - 1 - (valueStartBitIndex < 0 ? bitWidth - 1 + valueStartBitIndex : bitWidth - 1);
       byteStartBitWanted = 7 - (valueEndBitIndex > 0 ? 0 : -valueEndBitIndex);
       byteEndBitWanted = 7 - (valueStartBitIndex < 0 ? 8 : 7 - valueStartBitIndex);
       shift = valueStartBitWanted - byteStartBitWanted;
+      widthWanted = Math.max(0, byteStartBitWanted) - Math.max(0, byteEndBitWanted) + 1;
     }
+    
+    int maskWidth = widthWanted + Math.max(0, shift);
 
     visualizeAlignment(
         fw, bitWidth, valueEndBitIndex,
@@ -141,7 +173,7 @@ public class ByteBasedBitPackingGenerator {
         byteStartBitWanted, byteEndBitWanted,
         shift
         );
-    return shift;
+    return new ShiftMask(shift, genMask(maskWidth, isLong));
   }
 
   private static void visualizeAlignment(FileWriter fw, int bitWidth,
@@ -177,9 +209,11 @@ public class ByteBasedBitPackingGenerator {
     fw.append("           ");
   }
 
-  private static void generatePack(FileWriter fw, int bitWidth, int batch, boolean msbFirst) throws IOException {
-    int mask = genMask(bitWidth);
-    fw.append("    public final void pack" + (batch * 8) + "Values(final int[] in, final int inPos, final byte[] out, final int outPos) {\n");
+  private static void generatePack(FileWriter fw, int bitWidth, int batch, boolean isLong, boolean msbFirst) throws IOException {
+    long mask = genMask(bitWidth, isLong);
+    String maskSuffix = isLong ? "L" : "";
+    String variableType = isLong ? VARIABLE_TYPE_FOR_LONG : VARIABLE_TYPE_FOR_INT;
+    fw.append("    public final void pack" + (batch * 8) + "Values(final " + variableType + "[] in, final int inPos, final byte[] out, final int outPos) {\n");
     for (int byteIndex = 0; byteIndex < bitWidth * batch; ++byteIndex) {
       fw.append("      out[" + align(byteIndex, 2) + " + outPos] = (byte)((\n");
       int startIndex = (byteIndex * 8) / bitWidth;
@@ -191,32 +225,31 @@ public class ByteBasedBitPackingGenerator {
         } else {
           fw.append("\n        | ");
         }
-        int shift = getShift(fw, bitWidth, msbFirst, byteIndex, valueIndex);
+        ShiftMask shiftMask = getShift(fw, bitWidth, isLong, msbFirst, byteIndex, valueIndex);
 
         String shiftString = ""; // used when shift == 0
-        if (shift > 0) {
-          shiftString = " >>> " + shift;
-        } else if (shift < 0) {
-          shiftString = " <<  " + ( - shift);
+        if (shiftMask.shift > 0) {
+          shiftString = " >>> " + shiftMask.shift;
+        } else if (shiftMask.shift < 0) {
+          shiftString = " <<  " + ( - shiftMask.shift);
         }
-        fw.append("((in[" + align(valueIndex, 2) + " + inPos] & " + mask + ")" + shiftString + ")");
+        fw.append("((in[" + align(valueIndex, 2) + " + inPos] & " + mask + maskSuffix + ")" + shiftString + ")");
       }
       fw.append(") & 255);\n");
     }
     fw.append("    }\n");
   }
 
-  private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boolean msbFirst, boolean useByteArray)
+  private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boolean isLong, boolean msbFirst, boolean useByteArray)
       throws IOException {
-    final String bufferDataType;
-    if (useByteArray) {
-      bufferDataType = "byte[]";
-    } else {
-      bufferDataType = "ByteBuffer";
-    }
-    fw.append("    public final void unpack" + (batch * 8) + "Values(final " + bufferDataType + " in, final int inPos, final int[] out, final int outPos) {\n");
+    final String variableType = isLong ? VARIABLE_TYPE_FOR_LONG : VARIABLE_TYPE_FOR_INT;
+    final String bufferDataType = useByteArray ? "byte[]" : "ByteBuffer";
+    
+    fw.append("    public final void unpack" + (batch * 8) + "Values(final " + bufferDataType + " in, "
+        + "final int inPos, final " + variableType + "[] out, final int outPos) {\n");
+
     if (bitWidth > 0) {
-      int mask = genMask(bitWidth);
+      String maskSuffix = isLong ? "L" : "";
       for (int valueIndex = 0; valueIndex < (batch * 8); ++valueIndex) {
         fw.append("      out[" + align(valueIndex, 2) + " + outPos] =\n");
 
@@ -229,14 +262,16 @@ public class ByteBasedBitPackingGenerator {
           } else {
             fw.append("\n        | ");
           }
-          int shift = getShift(fw, bitWidth, msbFirst, byteIndex, valueIndex);
+          
+          ShiftMask shiftMask = getShift(fw, bitWidth, isLong, msbFirst, byteIndex, valueIndex);
 
           String shiftString = ""; // when shift == 0
-          if (shift < 0) {
-            shiftString = ">>>  " + (-shift);
-          } else if (shift > 0){
-            shiftString = "<<  " + shift;
+          if (shiftMask.shift < 0) {
+            shiftString = ">>  " + (-shiftMask.shift);
+          } else if (shiftMask.shift > 0){
+            shiftString = "<<  " + shiftMask.shift;
           }
+
           final String byteAccess;
           if (useByteArray) {
             byteAccess = "in[" + align(byteIndex, 2) + " + inPos]";
@@ -244,7 +279,10 @@ public class ByteBasedBitPackingGenerator {
             // use ByteBuffer#get(index) method
             byteAccess = "in.get(" + align(byteIndex, 2) + " + inPos)";
           }
-          fw.append(" (((((int)" + byteAccess + ") & 255) " + shiftString + ") & " + mask + ")");
+
+          // Shift the wanted bits to the least significant position and mask them knowing how many bits to get.
+          fw.append(" ((((" + variableType + ")" + byteAccess + ") " + shiftString +
+              ") & " + shiftMask.mask + maskSuffix + ")");
         }
         fw.append(";\n");
       }
@@ -252,8 +290,14 @@ public class ByteBasedBitPackingGenerator {
     fw.append("    }\n");
   }
 
-  private static int genMask(int bitWidth) {
-    int mask = 0;
+  private static long genMask(int bitWidth, boolean isLong) {
+    int maxBitWidth = isLong ? MAX_BITS_FOR_LONG : MAX_BITS_FOR_INT;
+    if (bitWidth >= maxBitWidth) {
+      // -1 is always ones (11111...1111). It covers all it can possibly can.
+      return -1;
+    }
+    
+    long mask = 0;
     for (int i = 0; i < bitWidth; i++) {
       mask <<= 1;
       mask |= 1;


[2/2] parquet-mr git commit: PARQUET-225: Add support for INT64 delta encoding.

Posted by bl...@apache.org.
PARQUET-225: Add support for INT64 delta encoding.

Author: Vassil Lunchev <va...@leanplum.com>

Closes #154 from lunchev:int64 and squashes the following commits:

84a40fe [Vassil Lunchev] INT64 support for Delta Encoding
4389af4 [Vassil Lunchev] splitting delta INT32 and delta INT64
e5e8fe2 [Vassil Lunchev] split delta encoding tests for INT32 and for INT64
eb4383a [Ryan Blue] PARQUET-225: Avoid multiple small copies in delta int/long encoding.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/8bcfe6c5
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/8bcfe6c5
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/8bcfe6c5

Branch: refs/heads/master
Commit: 8bcfe6c55e2588c1047368b4edbf733d1c1d5381
Parents: 7419443
Author: Vassil Lunchev <va...@leanplum.com>
Authored: Tue Mar 24 19:33:03 2015 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Thu Apr 21 11:37:51 2016 -0700

----------------------------------------------------------------------
 .../org/apache/parquet/column/Encoding.java     |   5 +-
 .../parquet/column/ParquetProperties.java       |   8 +-
 .../delta/DeltaBinaryPackingValuesReader.java   |  26 +-
 .../delta/DeltaBinaryPackingValuesWriter.java   | 158 +----------
 ...eltaBinaryPackingValuesWriterForInteger.java | 199 ++++++++++++++
 .../DeltaBinaryPackingValuesWriterForLong.java  | 201 ++++++++++++++
 .../DeltaLengthByteArrayValuesWriter.java       |   3 +-
 .../deltastrings/DeltaByteArrayWriter.java      |   4 +-
 ...BinaryPackingValuesWriterForIntegerTest.java | 266 +++++++++++++++++++
 ...ltaBinaryPackingValuesWriterForLongTest.java | 263 ++++++++++++++++++
 .../DeltaBinaryPackingValuesWriterTest.java     | 264 ------------------
 .../benchmark/BenchmarkIntegerOutputSize.java   |  12 +-
 .../BenchmarkReadingRandomIntegers.java         |  29 +-
 .../benchmark/RandomWritingBenchmarkTest.java   |  25 +-
 .../org/apache/parquet/bytes/BytesUtils.java    |  36 +++
 .../org/apache/parquet/bytes/BytesInput.java    |  39 ++-
 .../values/bitpacking/BytePackerForLong.java    | 112 ++++++++
 .../bitpacking/BytePackerForLongFactory.java    |  25 ++
 .../column/values/bitpacking/Packer.java        |  24 +-
 .../values/bitpacking/TestBitPacking.java       |  14 +
 .../values/bitpacking/TestByteBitPacking.java   |  48 +++-
 .../ByteBasedBitPackingGenerator.java           | 144 ++++++----
 22 files changed, 1397 insertions(+), 508 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
index 87bc798..0a24e76 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column;
 import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
 
@@ -163,8 +164,8 @@ public enum Encoding {
   DELTA_BINARY_PACKED {
     @Override
     public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
-      if(descriptor.getType() != INT32) {
-        throw new ParquetDecodingException("Encoding DELTA_BINARY_PACKED is only supported for type INT32");
+      if(descriptor.getType() != INT32 && descriptor.getType() != INT64) {
+        throw new ParquetDecodingException("Encoding DELTA_BINARY_PACKED is only supported for type INT32 and INT64");
       }
       return new DeltaBinaryPackingValuesReader();
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 0c07d54..9ed7736 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -32,7 +32,8 @@ import org.apache.parquet.column.impl.ColumnWriteStoreV2;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.boundedint.DevNullValuesWriter;
-import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
 import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
 import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
 import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
@@ -212,9 +213,10 @@ public class ParquetProperties {
       case FIXED_LEN_BYTE_ARRAY:
         return new DeltaByteArrayWriter(initialSlabSize, pageSizeThreshold, allocator);
       case INT32:
-        return new DeltaBinaryPackingValuesWriter(initialSlabSize, pageSizeThreshold, allocator);
-      case INT96:
+        return new DeltaBinaryPackingValuesWriterForInteger(initialSlabSize, pageSizeThreshold, allocator);
       case INT64:
+        return new DeltaBinaryPackingValuesWriterForLong(initialSlabSize, pageSizeThreshold, allocator);
+      case INT96:
       case DOUBLE:
       case FLOAT:
         return plainWriter(path);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
index 3f92deb..a3355d2 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -18,11 +18,13 @@
  */
 package org.apache.parquet.column.values.delta;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
 import org.apache.parquet.column.values.bitpacking.Packer;
 import org.apache.parquet.io.ParquetDecodingException;
 
@@ -40,12 +42,12 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
    * values read by the caller
    */
   private int valuesRead;
-  private int minDeltaInCurrentBlock;
+  private long minDeltaInCurrentBlock;
   private ByteBuffer page;
   /**
    * stores the decoded values including the first value which is written to the header
    */
-  private int[] valuesBuffer;
+  private long[] valuesBuffer;
   /**
    * values loaded to the buffer, it could be bigger than the totalValueCount
    * when data is not aligned to mini block, which means padding 0s are in the buffer
@@ -74,7 +76,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
     bitWidths = new int[config.miniBlockNumInABlock];
 
     //read first value from header
-    valuesBuffer[valuesBuffered++] = BytesUtils.readZigZagVarInt(in);
+    valuesBuffer[valuesBuffered++] = BytesUtils.readZigZagVarLong(in);
 
     while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
       loadNewBlockToBuffer();
@@ -94,7 +96,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
   private void allocateValuesBuffer() {
     int totalMiniBlockCount = (int) Math.ceil((double) totalValueCount / config.miniBlockSizeInValues);
     //+ 1 because first value written to header is also stored in values buffer
-    valuesBuffer = new int[totalMiniBlockCount * config.miniBlockSizeInValues + 1];
+    valuesBuffer = new long[totalMiniBlockCount * config.miniBlockSizeInValues + 1];
   }
 
   @Override
@@ -105,6 +107,12 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
 
   @Override
   public int readInteger() {
+    // TODO: probably implement it separately
+    return (int) readLong();
+  }
+
+  @Override
+  public long readLong() {
     checkRead();
     return valuesBuffer[valuesRead++];
   }
@@ -117,7 +125,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
 
   private void loadNewBlockToBuffer() {
     try {
-      minDeltaInCurrentBlock = BytesUtils.readZigZagVarInt(in);
+      minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(in);
     } catch (IOException e) {
       throw new ParquetDecodingException("can not read min delta in current block", e);
     }
@@ -127,7 +135,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
     // mini block is atomic for reading, we read a mini block when there are more values left
     int i;
     for (i = 0; i < config.miniBlockNumInABlock && valuesBuffered < totalValueCount; i++) {
-      BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidths[i]);
+      BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidths[i]);
       unpackMiniBlock(packer);
     }
 
@@ -144,13 +152,13 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
    *
    * @param packer the packer created from bitwidth of current mini block
    */
-  private void unpackMiniBlock(BytePacker packer) {
+  private void unpackMiniBlock(BytePackerForLong packer) {
     for (int j = 0; j < config.miniBlockSizeInValues; j += 8) {
       unpack8Values(packer);
     }
   }
 
-  private void unpack8Values(BytePacker packer) {
+  private void unpack8Values(BytePackerForLong packer) {
     //calculate the pos because the packer api uses array not stream
     int pos = page.limit() - in.available();
     packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
index 421182f..ac3c594 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
@@ -24,7 +24,7 @@ import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.values.ValuesWriter;
-import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
 import org.apache.parquet.column.values.bitpacking.Packer;
 import org.apache.parquet.io.ParquetEncodingException;
 
@@ -50,68 +50,40 @@ import java.io.IOException;
  *
  * @author Tianshuo Deng
  */
-public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
-  /**
-   * max bitwidth for a mini block, it is used to allocate miniBlockByteBuffer which is
-   * reused between flushes.
-   */
-  public static final int MAX_BITWIDTH = 32;
+public abstract class DeltaBinaryPackingValuesWriter extends ValuesWriter {
 
   public static final int DEFAULT_NUM_BLOCK_VALUES = 128;
 
   public static final int DEFAULT_NUM_MINIBLOCKS = 4;
 
-  private final CapacityByteArrayOutputStream baos;
+  protected final CapacityByteArrayOutputStream baos;
 
   /**
    * stores blockSizeInValues, miniBlockNumInABlock and miniBlockSizeInValues
    */
-  private final DeltaBinaryPackingConfig config;
+  protected final DeltaBinaryPackingConfig config;
 
   /**
    * bit width for each mini block, reused between flushes
    */
-  private final int[] bitWidths;
+  protected final int[] bitWidths;
 
-  private int totalValueCount = 0;
+  protected int totalValueCount = 0;
 
   /**
    * a pointer to deltaBlockBuffer indicating the end of deltaBlockBuffer
    * the number of values in the deltaBlockBuffer that haven't flushed to baos
    * it will be reset after each flush
    */
-  private int deltaValuesToFlush = 0;
-
-  /**
-   * stores delta values starting from the 2nd value written(1st value is stored in header).
-   * It's reused between flushes
-   */
-  private int[] deltaBlockBuffer;
+  protected int deltaValuesToFlush = 0;
 
   /**
    * bytes buffer for a mini block, it is reused for each mini block.
    * Therefore the size of biggest miniblock with bitwith of MAX_BITWITH is allocated
    */
-  private byte[] miniBlockByteBuffer;
-
-  /**
-   * firstValue is written to the header of the page
-   */
-  private int firstValue = 0;
-
-  /**
-   * cache previous written value for calculating delta
-   */
-  private int previousValue = 0;
-
-  /**
-   * min delta is written to the beginning of each block.
-   * it's zig-zag encoded. The deltas stored in each block is actually the difference to min delta,
-   * therefore are all positive
-   * it will be reset after each flush
-   */
-  private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
+  protected byte[] miniBlockByteBuffer;
 
+// TODO: remove this.
   public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize, ByteBufferAllocator allocator) {
     this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize, allocator);
   }
@@ -119,8 +91,6 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
   public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize, ByteBufferAllocator allocator) {
     this.config = new DeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum);
     bitWidths = new int[config.miniBlockNumInABlock];
-    deltaBlockBuffer = new int[blockSizeInValues];
-    miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
     baos = new CapacityByteArrayOutputStream(slabSize, pageSize, allocator);
   }
 
@@ -129,64 +99,7 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
     return baos.size();
   }
 
-  @Override
-  public void writeInteger(int v) {
-    totalValueCount++;
-
-    if (totalValueCount == 1) {
-      firstValue = v;
-      previousValue = firstValue;
-      return;
-    }
-
-    int delta = v - previousValue;//calculate delta
-    previousValue = v;
-
-    deltaBlockBuffer[deltaValuesToFlush++] = delta;
-
-    if (delta < minDeltaInCurrentBlock) {
-      minDeltaInCurrentBlock = delta;
-    }
-
-    if (config.blockSizeInValues == deltaValuesToFlush) {
-      flushBlockBuffer();
-    }
-  }
-
-  private void flushBlockBuffer() {
-    //since we store the min delta, the deltas will be converted to be the difference to min delta and all positive
-    for (int i = 0; i < deltaValuesToFlush; i++) {
-      deltaBlockBuffer[i] = deltaBlockBuffer[i] - minDeltaInCurrentBlock;
-    }
-
-    writeMinDelta();
-    int miniBlocksToFlush = getMiniBlockCountToFlush(deltaValuesToFlush);
-
-    calculateBitWidthsForDeltaBlockBuffer(miniBlocksToFlush);
-    for (int i = 0; i < config.miniBlockNumInABlock; i++) {
-      writeBitWidthForMiniBlock(i);
-    }
-
-    for (int i = 0; i < miniBlocksToFlush; i++) {
-      //writing i th miniblock
-      int currentBitWidth = bitWidths[i];
-      BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(currentBitWidth);
-      int miniBlockStart = i * config.miniBlockSizeInValues;
-      for (int j = miniBlockStart; j < (i + 1) * config.miniBlockSizeInValues; j += 8) {//8 values per pack
-        // mini block is atomic in terms of flushing
-        // This may write more values when reach to the end of data writing to last mini block,
-        // since it may not be aligend to miniblock,
-        // but doesnt matter. The reader uses total count to see if reached the end.
-        packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, 0);
-        baos.write(miniBlockByteBuffer, 0, currentBitWidth);
-      }
-    }
-
-    minDeltaInCurrentBlock = Integer.MAX_VALUE;
-    deltaValuesToFlush = 0;
-  }
-
-  private void writeBitWidthForMiniBlock(int i) {
+  protected void writeBitWidthForMiniBlock(int i) {
     try {
       BytesUtils.writeIntLittleEndianOnOneByte(baos, bitWidths[i]);
     } catch (IOException e) {
@@ -194,57 +107,10 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
     }
   }
 
-  private void writeMinDelta() {
-    try {
-      BytesUtils.writeZigZagVarInt(minDeltaInCurrentBlock, baos);
-    } catch (IOException e) {
-      throw new ParquetEncodingException("can not write min delta for block", e);
-    }
-  }
-
-  /**
-   * iterate through values in each mini block and calculate the bitWidths of max values.
-   *
-   * @param miniBlocksToFlush
-   */
-  private void calculateBitWidthsForDeltaBlockBuffer(int miniBlocksToFlush) {
-    for (int miniBlockIndex = 0; miniBlockIndex < miniBlocksToFlush; miniBlockIndex++) {
-
-      int mask = 0;
-      int miniStart = miniBlockIndex * config.miniBlockSizeInValues;
-
-      //The end of current mini block could be the end of current block(deltaValuesToFlush) buffer when data is not aligned to mini block
-      int miniEnd = Math.min((miniBlockIndex + 1) * config.miniBlockSizeInValues, deltaValuesToFlush);
-
-      for (int i = miniStart; i < miniEnd; i++) {
-        mask |= deltaBlockBuffer[i];
-      }
-      bitWidths[miniBlockIndex] = 32 - Integer.numberOfLeadingZeros(mask);
-    }
-  }
-
-  private int getMiniBlockCountToFlush(double numberCount) {
+  protected int getMiniBlockCountToFlush(double numberCount) {
     return (int) Math.ceil(numberCount / config.miniBlockSizeInValues);
   }
 
-  /**
-   * getBytes will trigger flushing block buffer, DO NOT write after getBytes() is called without calling reset()
-   *
-   * @return
-   */
-  @Override
-  public BytesInput getBytes() {
-    //The Page Header should include: blockSizeInValues, numberOfMiniBlocks, totalValueCount
-    if (deltaValuesToFlush != 0) {
-      flushBlockBuffer();
-    }
-    return BytesInput.concat(
-            config.toBytesInput(),
-            BytesInput.fromUnsignedVarInt(totalValueCount),
-            BytesInput.fromZigZagVarInt(firstValue),
-            BytesInput.from(baos));
-  }
-
   @Override
   public Encoding getEncoding() {
     return Encoding.DELTA_BINARY_PACKED;
@@ -255,7 +121,6 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
     this.totalValueCount = 0;
     this.baos.reset();
     this.deltaValuesToFlush = 0;
-    this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
   }
 
   @Override
@@ -263,7 +128,6 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
     this.totalValueCount = 0;
     this.baos.close();
     this.deltaValuesToFlush = 0;
-    this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java
new file mode 100644
index 0000000..f2d0acc
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java
@@ -0,0 +1,199 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+import java.io.IOException;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Write integers (INT32) with delta encoding and binary packing.
+ * 
+ * @author Vassil Lunchev
+ */
+public class DeltaBinaryPackingValuesWriterForInteger extends DeltaBinaryPackingValuesWriter {
+  /**
+   * max bitwidth for a mini block, it is used to allocate miniBlockByteBuffer which is
+   * reused between flushes.
+   */
+  private static final int MAX_BITWIDTH = 32;
+
+  /**
+   * stores delta values starting from the 2nd value written(1st value is stored in header).
+   * It's reused between flushes
+   */
+  private int[] deltaBlockBuffer;
+
+  /**
+   * firstValue is written to the header of the page
+   */
+  private int firstValue = 0;
+
+  /**
+   * cache previous written value for calculating delta
+   */
+  private int previousValue = 0;
+
+  /**
+   * min delta is written to the beginning of each block.
+   * it's zig-zag encoded. The deltas stored in each block is actually the difference to min delta,
+   * therefore are all positive
+   * it will be reset after each flush
+   */
+  private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
+
+  public DeltaBinaryPackingValuesWriterForInteger(
+      int slabSize, int pageSize, ByteBufferAllocator allocator) {
+    this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize, allocator);
+  }
+
+  public DeltaBinaryPackingValuesWriterForInteger(int blockSizeInValues, int miniBlockNum, 
+      int slabSize, int pageSize, ByteBufferAllocator allocator) {
+    super(blockSizeInValues, miniBlockNum, slabSize, pageSize, allocator);
+    deltaBlockBuffer = new int[config.blockSizeInValues];
+    miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
+  }
+
+  @Override
+  public void writeInteger(int v) {
+    totalValueCount++;
+
+    if (totalValueCount == 1) {
+      firstValue = v;
+      previousValue = firstValue;
+      return;
+    }
+
+    // Calculate delta. The possible overflow is accounted for. The algorithm is correct because
+    // Java int is working as a modalar ring with base 2^32 and because of the plus and minus
+    // properties of a ring. http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    int delta = v - previousValue;
+    previousValue = v;
+
+    deltaBlockBuffer[deltaValuesToFlush++] = delta;
+
+    if (delta < minDeltaInCurrentBlock) {
+      minDeltaInCurrentBlock = delta;
+    }
+
+    if (config.blockSizeInValues == deltaValuesToFlush) {
+      flushBlockBuffer();
+    }
+  }
+
+  private void flushBlockBuffer() {
+    // since we store the min delta, the deltas will be converted to be the difference to min delta
+    // and all positive
+    for (int i = 0; i < deltaValuesToFlush; i++) {
+      deltaBlockBuffer[i] = deltaBlockBuffer[i] - minDeltaInCurrentBlock;
+    }
+
+    writeMinDelta();
+    int miniBlocksToFlush = getMiniBlockCountToFlush(deltaValuesToFlush);
+
+    calculateBitWidthsForDeltaBlockBuffer(miniBlocksToFlush);
+    for (int i = 0; i < config.miniBlockNumInABlock; i++) {
+      writeBitWidthForMiniBlock(i);
+    }
+
+    for (int i = 0; i < miniBlocksToFlush; i++) {
+      // writing i th miniblock
+      int currentBitWidth = bitWidths[i];
+      int blockOffset = 0;
+      BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(currentBitWidth);
+      int miniBlockStart = i * config.miniBlockSizeInValues;
+      for (int j = miniBlockStart; j < (i + 1) * config.miniBlockSizeInValues; j += 8) {//8 values per pack
+        // mini block is atomic in terms of flushing
+        // This may write more values when reach to the end of data writing to last mini block,
+        // since it may not be aligned to miniblock,
+        // but doesn't matter. The reader uses total count to see if reached the end.
+        packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, blockOffset);
+        blockOffset += currentBitWidth;
+      }
+      baos.write(miniBlockByteBuffer, 0, blockOffset);
+    }
+
+    minDeltaInCurrentBlock = Integer.MAX_VALUE;
+    deltaValuesToFlush = 0;
+  }
+
+  private void writeMinDelta() {
+    try {
+      BytesUtils.writeZigZagVarInt(minDeltaInCurrentBlock, baos);
+    } catch (IOException e) {
+      throw new ParquetEncodingException("can not write min delta for block", e);
+    }
+  }
+
+  /**
+   * iterate through values in each mini block and calculate the bitWidths of max values.
+   *
+   * @param miniBlocksToFlush
+   */
+  private void calculateBitWidthsForDeltaBlockBuffer(int miniBlocksToFlush) {
+    for (int miniBlockIndex = 0; miniBlockIndex < miniBlocksToFlush; miniBlockIndex++) {
+      int mask = 0;
+      int miniStart = miniBlockIndex * config.miniBlockSizeInValues;
+
+      //The end of current mini block could be the end of current block(deltaValuesToFlush) buffer when data is not aligned to mini block
+      int miniEnd = Math.min((miniBlockIndex + 1) * config.miniBlockSizeInValues, deltaValuesToFlush);
+
+      for (int i = miniStart; i < miniEnd; i++) {
+        mask |= deltaBlockBuffer[i];
+      }
+      bitWidths[miniBlockIndex] = 32 - Integer.numberOfLeadingZeros(mask);
+    }
+  }
+
+  /**
+   * getBytes will trigger flushing block buffer, DO NOT write after getBytes() is called without calling reset()
+   *
+   * @return
+   */
+  @Override
+  public BytesInput getBytes() {
+    // The Page Header should include: blockSizeInValues, numberOfMiniBlocks, totalValueCount
+    if (deltaValuesToFlush != 0) {
+      flushBlockBuffer();
+    }
+    return BytesInput.concat(
+            config.toBytesInput(),
+            BytesInput.fromUnsignedVarInt(totalValueCount),
+            BytesInput.fromZigZagVarInt(firstValue),
+            BytesInput.from(baos));
+  }
+
+  @Override
+  public void reset() {
+    super.reset();
+    this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java
new file mode 100644
index 0000000..30eecef
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java
@@ -0,0 +1,201 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+import java.io.IOException;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Write longs (INT64) with delta encoding and binary packing.
+ * 
+ * @author Vassil Lunchev
+ */
+public class DeltaBinaryPackingValuesWriterForLong extends DeltaBinaryPackingValuesWriter {
+  /**
+   * max bitwidth for a mini block, it is used to allocate miniBlockByteBuffer which is
+   * reused between flushes.
+   */
+  private static final int MAX_BITWIDTH = 64;
+
+  /**
+   * stores delta values starting from the 2nd value written(1st value is stored in header).
+   * It's reused between flushes
+   */
+  private long[] deltaBlockBuffer;
+
+  /**
+   * firstValue is written to the header of the page
+   */
+  private long firstValue = 0;
+
+  /**
+   * cache previous written value for calculating delta
+   */
+  private long previousValue = 0;
+
+  /**
+   * min delta is written to the beginning of each block.
+   * it's zig-zag encoded. The deltas stored in each block is actually the difference to min delta,
+   * therefore are all positive
+   * it will be reset after each flush
+   */
+  private long minDeltaInCurrentBlock = Long.MAX_VALUE;
+
+  public DeltaBinaryPackingValuesWriterForLong(
+      int slabSize, int pageSize, ByteBufferAllocator allocator) {
+    this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize, allocator);
+  }
+
+  public DeltaBinaryPackingValuesWriterForLong(int blockSizeInValues, int miniBlockNum, 
+      int slabSize, int pageSize, ByteBufferAllocator allocator) {
+    super(blockSizeInValues, miniBlockNum, slabSize, pageSize, allocator);
+    deltaBlockBuffer = new long[config.blockSizeInValues];
+    miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
+  }
+
+  @Override
+  public void writeLong(long v) {
+    totalValueCount++;
+
+    if (totalValueCount == 1) {
+      firstValue = v;
+      previousValue = firstValue;
+      return;
+    }
+
+    // Calculate delta. The possible overflow is accounted for. The algorithm is correct because
+    // Java long is working as a modalar ring with base 2^64 and because of the plus and minus
+    // properties of a ring. http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+    long delta = v - previousValue;
+    previousValue = v;
+
+    deltaBlockBuffer[deltaValuesToFlush++] = delta;
+
+    if (delta < minDeltaInCurrentBlock) {
+      minDeltaInCurrentBlock = delta;
+    }
+
+    if (config.blockSizeInValues == deltaValuesToFlush) {
+      flushBlockBuffer();
+    }
+  }
+
+  private void flushBlockBuffer() {
+    // since we store the min delta, the deltas will be converted to be the difference to min delta
+    // and all positive
+    for (int i = 0; i < deltaValuesToFlush; i++) {
+      deltaBlockBuffer[i] = deltaBlockBuffer[i] - minDeltaInCurrentBlock;
+    }
+
+    writeMinDelta();
+    int miniBlocksToFlush = getMiniBlockCountToFlush(deltaValuesToFlush);
+
+    calculateBitWidthsForDeltaBlockBuffer(miniBlocksToFlush);
+    for (int i = 0; i < config.miniBlockNumInABlock; i++) {
+      writeBitWidthForMiniBlock(i);
+    }
+
+    for (int i = 0; i < miniBlocksToFlush; i++) {
+      // writing i th miniblock
+      int currentBitWidth = bitWidths[i];
+      int blockOffset = 0;
+      // TODO: should this cache the packer?
+      BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(currentBitWidth);
+      int miniBlockStart = i * config.miniBlockSizeInValues;
+      // pack values into the miniblock buffer, 8 at a time to get exactly currentBitWidth bytes
+      for (int j = miniBlockStart; j < (i + 1) * config.miniBlockSizeInValues; j += 8) {
+        // mini block is atomic in terms of flushing
+        // This may write more values when reach to the end of data writing to last mini block,
+        // since it may not be aligned to miniblock,
+        // but doesn't matter. The reader uses total count to see if reached the end.
+        packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, blockOffset);
+        blockOffset += currentBitWidth;
+      }
+      baos.write(miniBlockByteBuffer, 0, blockOffset);
+    }
+
+    minDeltaInCurrentBlock = Long.MAX_VALUE;
+    deltaValuesToFlush = 0;
+  }
+
+  private void writeMinDelta() {
+    try {
+      BytesUtils.writeZigZagVarLong(minDeltaInCurrentBlock, baos);
+    } catch (IOException e) {
+      throw new ParquetEncodingException("can not write min delta for block", e);
+    }
+  }
+
+  /**
+   * iterate through values in each mini block and calculate the bitWidths of max values.
+   *
+   * @param miniBlocksToFlush
+   */
+  private void calculateBitWidthsForDeltaBlockBuffer(int miniBlocksToFlush) {
+    for (int miniBlockIndex = 0; miniBlockIndex < miniBlocksToFlush; miniBlockIndex++) {
+      long mask = 0;
+      int miniStart = miniBlockIndex * config.miniBlockSizeInValues;
+
+      //The end of current mini block could be the end of current block(deltaValuesToFlush) buffer when data is not aligned to mini block
+      int miniEnd = Math.min((miniBlockIndex + 1) * config.miniBlockSizeInValues, deltaValuesToFlush);
+
+      for (int i = miniStart; i < miniEnd; i++) {
+        mask |= deltaBlockBuffer[i];
+      }
+      bitWidths[miniBlockIndex] = 64 - Long.numberOfLeadingZeros(mask);
+    }
+  }
+
+  /**
+   * getBytes will trigger flushing block buffer, DO NOT write after getBytes() is called without calling reset()
+   *
+   * @return
+   */
+  @Override
+  public BytesInput getBytes() {
+    // The Page Header should include: blockSizeInValues, numberOfMiniBlocks, totalValueCount
+    if (deltaValuesToFlush != 0) {
+      flushBlockBuffer();
+    }
+    return BytesInput.concat(
+            config.toBytesInput(),
+            BytesInput.fromUnsignedVarInt(totalValueCount),
+            BytesInput.fromZigZagVarLong(firstValue),
+            BytesInput.from(baos));
+  }
+
+  @Override
+  public void reset() {
+    super.reset();
+    this.minDeltaInCurrentBlock = Long.MAX_VALUE;
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    this.minDeltaInCurrentBlock = Long.MAX_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
index 2d6b213..f7ad912 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
@@ -28,6 +28,7 @@ import org.apache.parquet.bytes.LittleEndianDataOutputStream;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.io.api.Binary;
 
@@ -52,7 +53,7 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
   public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) {
     arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator);
     out = new LittleEndianDataOutputStream(arrayOut);
-    lengthWriter = new DeltaBinaryPackingValuesWriter(
+    lengthWriter = new DeltaBinaryPackingValuesWriterForInteger(
         DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES,
         DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS,
         initialSize, pageSize, allocator);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
index 1604ddb..fb6cc9b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
@@ -23,6 +23,7 @@ import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
 import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter;
 import org.apache.parquet.io.api.Binary;
 
@@ -43,7 +44,8 @@ public class DeltaByteArrayWriter extends ValuesWriter{
   private byte[] previous;
 
   public DeltaByteArrayWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
-    this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity, pageSize, allocator);
+    this.prefixLengthWriter = 
+        new DeltaBinaryPackingValuesWriterForInteger(128, 4, initialCapacity, pageSize, allocator);
     this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity, pageSize, allocator);
     this.previous = new byte[0];
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
new file mode 100644
index 0000000..a3bec4a
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
@@ -0,0 +1,266 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.ParquetDecodingException;
+
+public class DeltaBinaryPackingValuesWriterForIntegerTest {
+  DeltaBinaryPackingValuesReader reader;
+  private int blockSize;
+  private int miniBlockNum;
+  private ValuesWriter writer;
+  private Random random;
+
+  @Before
+  public void setUp() {
+    blockSize = 128;
+    miniBlockNum = 4;
+    writer = new DeltaBinaryPackingValuesWriterForInteger(
+        blockSize, miniBlockNum, 100, 200, new DirectByteBufferAllocator());
+    random = new Random(0);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void miniBlockSizeShouldBeMultipleOf8() {
+    new DeltaBinaryPackingValuesWriterForInteger(
+        1281, 4, 100, 100, new DirectByteBufferAllocator());
+  }
+
+  /* When data size is multiple of Block*/
+  @Test
+  public void shouldWriteWhenDataIsAlignedWithBlock() throws IOException {
+    int[] data = new int[5 * blockSize];
+    for (int i = 0; i < blockSize * 5; i++) {
+      data[i] = random.nextInt();
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteAndReadWhenBlockIsNotFullyWritten() throws IOException {
+    int[] data = new int[blockSize - 3];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextInt();
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteAndReadWhenAMiniBlockIsNotFullyWritten() throws IOException {
+    int miniBlockSize = blockSize / miniBlockNum;
+    int[] data = new int[miniBlockSize - 3];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextInt();
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteNegativeDeltas() throws IOException {
+    int[] data = new int[blockSize];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = 10 - (i * 32 - random.nextInt(6));
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteAndReadWhenDeltasAreSame() throws IOException {
+    int[] data = new int[2 * blockSize];
+    for (int i = 0; i < blockSize; i++) {
+      data[i] = i * 32;
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteAndReadWhenValuesAreSame() throws IOException {
+    int[] data = new int[2 * blockSize];
+    for (int i = 0; i < blockSize; i++) {
+      data[i] = 3;
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteWhenDeltaIs0ForEachBlock() throws IOException {
+    int[] data = new int[5 * blockSize + 1];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = (i - 1) / blockSize;
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldReadWriteWhenDataIsNotAlignedWithBlock() throws IOException {
+    int[] data = new int[5 * blockSize + 3];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextInt(20) - 10;
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldReadMaxMinValue() throws IOException {
+    int[] data = new int[10];
+    for (int i = 0; i < data.length; i++) {
+      if (i % 2 == 0) {
+        data[i] = Integer.MIN_VALUE;
+      } else {
+        data[i] = Integer.MAX_VALUE;
+      }
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldReturnCorrectOffsetAfterInitialization() throws IOException {
+    int[] data = new int[2 * blockSize + 3];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = i * 32;
+    }
+    writeData(data);
+
+    reader = new DeltaBinaryPackingValuesReader();
+    BytesInput bytes = writer.getBytes();
+    byte[] valueContent = bytes.toByteArray();
+    byte[] pageContent = new byte[valueContent.length * 10];
+    int contentOffsetInPage = 33;
+    System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
+
+    //offset should be correct
+    reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
+    int offset= reader.getNextOffset();
+    assertEquals(valueContent.length + contentOffsetInPage, offset);
+
+    //should be able to read data correclty
+    for (int i : data) {
+      assertEquals(i, reader.readInteger());
+    }
+  }
+
+  @Test
+  public void shouldThrowExceptionWhenReadMoreThanWritten() throws IOException {
+    int[] data = new int[5 * blockSize + 1];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = i * 32;
+    }
+    shouldWriteAndRead(data);
+    try {
+      reader.readInteger();
+    } catch (ParquetDecodingException e) {
+      assertEquals("no more value to read, total value count is " + data.length, e.getMessage());
+    }
+
+  }
+
+  @Test
+  public void shouldSkip() throws IOException {
+    int[] data = new int[5 * blockSize + 1];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = i * 32;
+    }
+    writeData(data);
+    reader = new DeltaBinaryPackingValuesReader();
+    reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
+    for (int i = 0; i < data.length; i++) {
+      if (i % 3 == 0) {
+        reader.skip();
+      } else {
+        assertEquals(i * 32, reader.readInteger());
+      }
+    }
+  }
+
+  @Test
+  public void shouldReset() throws IOException {
+    shouldReadWriteWhenDataIsNotAlignedWithBlock();
+    int[] data = new int[5 * blockSize];
+    for (int i = 0; i < blockSize * 5; i++) {
+      data[i] = i * 2;
+    }
+    writer.reset();
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void randomDataTest() throws IOException {
+    int maxSize = 1000;
+    int[] data = new int[maxSize];
+
+    for (int round = 0; round < 100000; round++) {
+
+
+      int size = random.nextInt(maxSize);
+
+      for (int i = 0; i < size; i++) {
+        data[i] = random.nextInt();
+      }
+      shouldReadAndWrite(data, size);
+      writer.reset();
+    }
+  }
+
+  private void shouldWriteAndRead(int[] data) throws IOException {
+    shouldReadAndWrite(data, data.length);
+  }
+
+  private void shouldReadAndWrite(int[] data, int length) throws IOException {
+    writeData(data, length);
+    reader = new DeltaBinaryPackingValuesReader();
+    byte[] page = writer.getBytes().toByteArray();
+    int miniBlockSize = blockSize / miniBlockNum;
+
+    double miniBlockFlushed = Math.ceil(((double) length - 1) / miniBlockSize);
+    double blockFlushed = Math.ceil(((double) length - 1) / blockSize);
+    double estimatedSize = 4 * 5 //blockHeader
+        + 4 * miniBlockFlushed * miniBlockSize //data(aligned to miniBlock)
+        + blockFlushed * miniBlockNum //bitWidth of mini blocks
+        + (5.0 * blockFlushed);//min delta for each block
+    assertTrue(estimatedSize >= page.length);
+    reader.initFromPage(100, ByteBuffer.wrap(page), 0);
+
+    for (int i = 0; i < length; i++) {
+      assertEquals(data[i], reader.readInteger());
+    }
+  }
+
+  private void writeData(int[] data) {
+    writeData(data, data.length);
+  }
+
+  private void writeData(int[] data, int length) {
+    for (int i = 0; i < length; i++) {
+      writer.writeInteger(data[i]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
new file mode 100644
index 0000000..34e1800
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
@@ -0,0 +1,263 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.ParquetDecodingException;
+
+public class DeltaBinaryPackingValuesWriterForLongTest {
+  DeltaBinaryPackingValuesReader reader;
+  private int blockSize;
+  private int miniBlockNum;
+  private ValuesWriter writer;
+  private Random random;
+
+  @Before
+  public void setUp() {
+    blockSize = 128;
+    miniBlockNum = 4;
+    writer = new DeltaBinaryPackingValuesWriterForLong(
+        blockSize, miniBlockNum, 100, 200, new DirectByteBufferAllocator());
+    random = new Random(0);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void miniBlockSizeShouldBeMultipleOf8() {
+    new DeltaBinaryPackingValuesWriterForLong(
+        1281, 4, 100, 100, new DirectByteBufferAllocator());
+  }
+
+  /* When data size is multiple of Block */
+  @Test
+  public void shouldWriteWhenDataIsAlignedWithBlock() throws IOException {
+    long[] data = new long[5 * blockSize];
+    for (int i = 0; i < blockSize * 5; i++) {
+      data[i] = random.nextLong();
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteAndReadWhenBlockIsNotFullyWritten() throws IOException {
+    long[] data = new long[blockSize - 3];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextLong();
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteAndReadWhenAMiniBlockIsNotFullyWritten() throws IOException {
+    int miniBlockSize = blockSize / miniBlockNum;
+    long[] data = new long[miniBlockSize - 3];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextLong();
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteNegativeDeltas() throws IOException {
+    long[] data = new long[blockSize];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = 10 - (i * 32 - random.nextInt(6));
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteAndReadWhenDeltasAreSame() throws IOException {
+    long[] data = new long[2 * blockSize];
+    for (int i = 0; i < blockSize; i++) {
+      data[i] = i * 32;
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteAndReadWhenValuesAreSame() throws IOException {
+    long[] data = new long[2 * blockSize];
+    for (int i = 0; i < blockSize; i++) {
+      data[i] = 3;
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldWriteWhenDeltaIs0ForEachBlock() throws IOException {
+    long[] data = new long[5 * blockSize + 1];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = (i - 1) / blockSize;
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldReadWriteWhenDataIsNotAlignedWithBlock() throws IOException {
+    long[] data = new long[5 * blockSize + 3];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = random.nextInt(20) - 10;
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldReadMaxMinValue() throws IOException {
+    long[] data = new long[10];
+    for (int i = 0; i < data.length; i++) {
+      if (i % 2 == 0) {
+        data[i] = Long.MIN_VALUE;
+      } else {
+        data[i] = Long.MAX_VALUE;
+      }
+    }
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void shouldReturnCorrectOffsetAfterInitialization() throws IOException {
+    long[] data = new long[2 * blockSize + 3];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = i * 32;
+    }
+    writeData(data);
+
+    reader = new DeltaBinaryPackingValuesReader();
+    BytesInput bytes = writer.getBytes();
+    byte[] valueContent = bytes.toByteArray();
+    byte[] pageContent = new byte[valueContent.length * 10];
+    int contentOffsetInPage = 33;
+    System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
+
+    //offset should be correct
+    reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
+    int offset = reader.getNextOffset();
+    assertEquals(valueContent.length + contentOffsetInPage, offset);
+
+    //should be able to read data correclty
+    for (long i : data) {
+      assertEquals(i, reader.readLong());
+    }
+  }
+
+  @Test
+  public void shouldThrowExceptionWhenReadMoreThanWritten() throws IOException {
+    long[] data = new long[5 * blockSize + 1];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = i * 32;
+    }
+    shouldWriteAndRead(data);
+    try {
+      reader.readLong();
+    } catch (ParquetDecodingException e) {
+      assertEquals("no more value to read, total value count is " + data.length, e.getMessage());
+    }
+  }
+
+  @Test
+  public void shouldSkip() throws IOException {
+    long[] data = new long[5 * blockSize + 1];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = i * 32;
+    }
+    writeData(data);
+    reader = new DeltaBinaryPackingValuesReader();
+    reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
+    for (int i = 0; i < data.length; i++) {
+      if (i % 3 == 0) {
+        reader.skip();
+      } else {
+        assertEquals(i * 32, reader.readLong());
+      }
+    }
+  }
+
+  @Test
+  public void shouldReset() throws IOException {
+    shouldReadWriteWhenDataIsNotAlignedWithBlock();
+    long[] data = new long[5 * blockSize];
+    for (int i = 0; i < blockSize * 5; i++) {
+      data[i] = i * 2;
+    }
+    writer.reset();
+    shouldWriteAndRead(data);
+  }
+
+  @Test
+  public void randomDataTest() throws IOException {
+    int maxSize = 1000;
+    long[] data = new long[maxSize];
+
+    for (int round = 0; round < 100000; round++) {
+      int size = random.nextInt(maxSize);
+
+      for (int i = 0; i < size; i++) {
+        data[i] = random.nextLong();
+      }
+      shouldReadAndWrite(data, size);
+      writer.reset();
+    }
+  }
+
+  private void shouldWriteAndRead(long[] data) throws IOException {
+    shouldReadAndWrite(data, data.length);
+  }
+
+  private void shouldReadAndWrite(long[] data, int length) throws IOException {
+    writeData(data, length);
+    reader = new DeltaBinaryPackingValuesReader();
+    byte[] page = writer.getBytes().toByteArray();
+    int miniBlockSize = blockSize / miniBlockNum;
+
+    double miniBlockFlushed = Math.ceil(((double) length - 1) / miniBlockSize);
+    double blockFlushed = Math.ceil(((double) length - 1) / blockSize);
+    double estimatedSize = 3 * 5 + 1 * 10 //blockHeader, 3 * int + 1 * long
+        + 8 * miniBlockFlushed * miniBlockSize //data(aligned to miniBlock)
+        + blockFlushed * miniBlockNum //bitWidth of mini blocks
+        + (10.0 * blockFlushed);//min delta for each block
+    assertTrue(estimatedSize >= page.length);
+    reader.initFromPage(100, page, 0);
+
+    for (int i = 0; i < length; i++) {
+      assertEquals(data[i], reader.readLong());
+    }
+  }
+
+  private void writeData(long[] data) {
+    writeData(data, data.length);
+  }
+
+  private void writeData(long[] data, int length) {
+    for (int i = 0; i < length; i++) {
+      writer.writeLong(data[i]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
deleted file mode 100644
index 6308e47..0000000
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.column.values.delta;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Random;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.parquet.bytes.DirectByteBufferAllocator;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.column.values.ValuesWriter;
-import org.apache.parquet.io.ParquetDecodingException;
-
-public class DeltaBinaryPackingValuesWriterTest {
-  DeltaBinaryPackingValuesReader reader;
-  private int blockSize;
-  private int miniBlockNum;
-  private ValuesWriter writer;
-  private Random random;
-
-  @Before
-  public void setUp() {
-    blockSize = 128;
-    miniBlockNum = 4;
-    writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 200, new DirectByteBufferAllocator());
-    random = new Random();
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void miniBlockSizeShouldBeMultipleOf8() {
-    new DeltaBinaryPackingValuesWriter(1281, 4, 100, 100, new DirectByteBufferAllocator());
-  }
-
-  /* When data size is multiple of Block*/
-  @Test
-  public void shouldWriteWhenDataIsAlignedWithBlock() throws IOException {
-    int[] data = new int[5 * blockSize];
-    for (int i = 0; i < blockSize * 5; i++) {
-      data[i] = random.nextInt();
-    }
-    shouldWriteAndRead(data);
-  }
-
-  @Test
-  public void shouldWriteAndReadWhenBlockIsNotFullyWritten() throws IOException {
-    int[] data = new int[blockSize - 3];
-    for (int i = 0; i < data.length; i++) {
-      data[i] = random.nextInt();
-    }
-    shouldWriteAndRead(data);
-  }
-
-  @Test
-  public void shouldWriteAndReadWhenAMiniBlockIsNotFullyWritten() throws IOException {
-    int miniBlockSize = blockSize / miniBlockNum;
-    int[] data = new int[miniBlockSize - 3];
-    for (int i = 0; i < data.length; i++) {
-      data[i] = random.nextInt();
-    }
-    shouldWriteAndRead(data);
-  }
-
-  @Test
-  public void shouldWriteNegativeDeltas() throws IOException {
-    int[] data = new int[blockSize];
-    for (int i = 0; i < data.length; i++) {
-      data[i] = 10 - (i * 32 - random.nextInt(6));
-    }
-    shouldWriteAndRead(data);
-  }
-
-  @Test
-  public void shouldWriteAndReadWhenDeltasAreSame() throws IOException {
-    int[] data = new int[2 * blockSize];
-    for (int i = 0; i < blockSize; i++) {
-      data[i] = i * 32;
-    }
-    shouldWriteAndRead(data);
-  }
-
-  @Test
-  public void shouldWriteAndReadWhenValuesAreSame() throws IOException {
-    int[] data = new int[2 * blockSize];
-    for (int i = 0; i < blockSize; i++) {
-      data[i] = 3;
-    }
-    shouldWriteAndRead(data);
-  }
-
-  @Test
-  public void shouldWriteWhenDeltaIs0ForEachBlock() throws IOException {
-    int[] data = new int[5 * blockSize + 1];
-    for (int i = 0; i < data.length; i++) {
-      data[i] = (i - 1) / blockSize;
-    }
-    shouldWriteAndRead(data);
-  }
-
-  @Test
-  public void shouldReadWriteWhenDataIsNotAlignedWithBlock() throws IOException {
-    int[] data = new int[5 * blockSize + 3];
-    for (int i = 0; i < data.length; i++) {
-      data[i] = random.nextInt(20) - 10;
-    }
-    shouldWriteAndRead(data);
-  }
-
-  @Test
-  public void shouldReadMaxMinValue() throws IOException {
-    int[] data = new int[10];
-    for (int i = 0; i < data.length; i++) {
-      if(i%2==0) {
-        data[i]=Integer.MIN_VALUE;
-      }else {
-        data[i]=Integer.MAX_VALUE;
-      }
-    }
-    shouldWriteAndRead(data);
-  }
-
-  @Test
-  public void shouldReturnCorrectOffsetAfterInitialization() throws IOException {
-    int[] data = new int[2 * blockSize + 3];
-    for (int i = 0; i < data.length; i++) {
-      data[i] = i * 32;
-    }
-    writeData(data);
-
-    reader = new DeltaBinaryPackingValuesReader();
-    BytesInput bytes = writer.getBytes();
-    byte[] valueContent = bytes.toByteArray();
-    byte[] pageContent = new byte[valueContent.length * 10];
-    int contentOffsetInPage = 33;
-    System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
-
-    //offset should be correct
-    reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
-    int offset= reader.getNextOffset();
-    assertEquals(valueContent.length + contentOffsetInPage, offset);
-
-    //should be able to read data correclty
-    for (int i : data) {
-      assertEquals(i, reader.readInteger());
-    }
-  }
-
-  @Test
-  public void shouldThrowExceptionWhenReadMoreThanWritten() throws IOException {
-    int[] data = new int[5 * blockSize + 1];
-    for (int i = 0; i < data.length; i++) {
-      data[i] = i * 32;
-    }
-    shouldWriteAndRead(data);
-    try {
-      reader.readInteger();
-    } catch (ParquetDecodingException e) {
-      assertEquals("no more value to read, total value count is " + data.length, e.getMessage());
-    }
-
-  }
-
-  @Test
-  public void shouldSkip() throws IOException {
-    int[] data = new int[5 * blockSize + 1];
-    for (int i = 0; i < data.length; i++) {
-      data[i] = i * 32;
-    }
-    writeData(data);
-    reader = new DeltaBinaryPackingValuesReader();
-    reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
-    for (int i = 0; i < data.length; i++) {
-      if (i % 3 == 0) {
-        reader.skip();
-      } else {
-        assertEquals(i * 32, reader.readInteger());
-      }
-    }
-  }
-
-  @Test
-  public void shouldReset() throws IOException {
-    shouldReadWriteWhenDataIsNotAlignedWithBlock();
-    int[] data = new int[5 * blockSize];
-    for (int i = 0; i < blockSize * 5; i++) {
-      data[i] = i * 2;
-    }
-    writer.reset();
-    shouldWriteAndRead(data);
-  }
-
-  @Test
-  public void randomDataTest() throws IOException {
-    int maxSize = 1000;
-    int[] data = new int[maxSize];
-
-    for (int round = 0; round < 100000; round++) {
-
-
-      int size = random.nextInt(maxSize);
-
-      for (int i = 0; i < size; i++) {
-        data[i] = random.nextInt();
-      }
-      shouldReadAndWrite(data, size);
-      writer.reset();
-    }
-  }
-
-  private void shouldWriteAndRead(int[] data) throws IOException {
-    shouldReadAndWrite(data, data.length);
-  }
-
-  private void shouldReadAndWrite(int[] data, int length) throws IOException {
-    writeData(data, length);
-    reader = new DeltaBinaryPackingValuesReader();
-    byte[] page = writer.getBytes().toByteArray();
-    int miniBlockSize = blockSize / miniBlockNum;
-
-    double miniBlockFlushed = Math.ceil(((double) length - 1) / miniBlockSize);
-    double blockFlushed = Math.ceil(((double) length - 1) / blockSize);
-    double estimatedSize = 4 * 5 //blockHeader
-        + 4 * miniBlockFlushed * miniBlockSize //data(aligned to miniBlock)
-        + blockFlushed * miniBlockNum //bitWidth of mini blocks
-        + (5.0 * blockFlushed);//min delta for each block
-    assertTrue(estimatedSize >= page.length);
-    reader.initFromPage(100, ByteBuffer.wrap(page), 0);
-
-    for (int i = 0; i < length; i++) {
-      assertEquals(data[i], reader.readInteger());
-    }
-  }
-
-  private void writeData(int[] data) {
-    writeData(data, data.length);
-  }
-
-  private void writeData(int[] data, int length) {
-    for (int i = 0; i < length; i++) {
-      writer.writeInteger(data[i]);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
index 40f6bfc..43cce3a 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
@@ -18,11 +18,13 @@
  */
 package org.apache.parquet.column.values.delta.benchmark;
 
-import org.junit.Test;
+import java.util.Random;
+
 import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
-import java.util.Random;
+import org.junit.Test;
 
 public class BenchmarkIntegerOutputSize {
   public static int blockSize=128;
@@ -78,8 +80,10 @@ public class BenchmarkIntegerOutputSize {
   }
 
   public void testRandomIntegers(IntFunc func,int bitWidth) {
-    DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
-    RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth, 100, 20000, new DirectByteBufferAllocator());
+    DeltaBinaryPackingValuesWriter delta = new DeltaBinaryPackingValuesWriterForInteger(
+        blockSize,miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
+    RunLengthBitPackingHybridValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(
+        bitWidth, 100, 20000, new DirectByteBufferAllocator());
     for (int i = 0; i < dataSize; i++) {
       int v = func.getIntValue();
       delta.writeInteger(v);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
index 4ad5dad..488208c 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
@@ -18,24 +18,25 @@
  */
 package org.apache.parquet.column.values.delta.benchmark;
 
-import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
-import com.carrotsearch.junitbenchmarks.BenchmarkRule;
-import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
-import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
 import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
-import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Random;
+import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
+import com.carrotsearch.junitbenchmarks.BenchmarkRule;
+import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
+import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
 
 @AxisRange(min = 0, max = 1)
 @BenchmarkMethodChart(filePrefix = "benchmark-encoding-reading-random")
@@ -56,8 +57,10 @@ public class BenchmarkReadingRandomIntegers {
       data[i] = random.nextInt(100) - 200;
     }
 
-    ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
-    ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000, new DirectByteBufferAllocator());
+    ValuesWriter delta = new DeltaBinaryPackingValuesWriterForInteger(
+        blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
+    ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(
+        32, 100, 20000, new DirectByteBufferAllocator());
 
     for (int i = 0; i < data.length; i++) {
       delta.writeInteger(data[i]);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
index 80e6533..f63eeda 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
@@ -18,18 +18,21 @@
  */
 package org.apache.parquet.column.values.delta.benchmark;
 
-import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
-import com.carrotsearch.junitbenchmarks.BenchmarkRule;
-import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
-import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
+import java.util.Random;
+
 import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
-import java.util.Random;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
+import com.carrotsearch.junitbenchmarks.BenchmarkRule;
+import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
+import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
 
 @AxisRange(min = 0, max = 1)
 @BenchmarkMethodChart(filePrefix = "benchmark-encoding-writing-random")
@@ -51,7 +54,8 @@ public class RandomWritingBenchmarkTest extends BenchMarkTest{
   @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
   @Test
   public void writeDeltaPackingTest(){
-    DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
+    DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriterForInteger(
+        blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
     runWriteTest(writer);
   }
 
@@ -65,7 +69,8 @@ public class RandomWritingBenchmarkTest extends BenchMarkTest{
   @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
   @Test
   public void writeDeltaPackingTest2(){
-    DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
+    DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriterForInteger(
+        blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
     runWriteTest(writer);
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
index d40721a..049f7bd 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
@@ -234,6 +234,42 @@ public class BytesUtils {
   }
 
   /**
+   * uses a trick mentioned in https://developers.google.com/protocol-buffers/docs/encoding to read zigZag encoded data
+   * TODO: the implementation is compatible with readZigZagVarInt. Is there a need for different functions?
+   * @param in
+   * @return
+   * @throws IOException
+   */
+  public static long readZigZagVarLong(InputStream in) throws IOException {
+    long raw = readUnsignedVarLong(in);
+    long temp = (((raw << 63) >> 63) ^ raw) >> 1;
+    return temp ^ (raw & (1L << 63));
+  }
+
+  public static long readUnsignedVarLong(InputStream in) throws IOException {
+    long value = 0;
+    int i = 0;
+    long b;
+    while (((b = in.read()) & 0x80) != 0) {
+      value |= (b & 0x7F) << i;
+      i += 7;
+    }
+    return value | (b << i);
+  }
+
+  public static void writeUnsignedVarLong(long value, OutputStream out) throws IOException {
+    while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) {
+      out.write((int)((value & 0x7F) | 0x80));
+      value >>>= 7;
+    }
+    out.write((int)(value & 0x7F));
+  }
+
+  public static void writeZigZagVarLong(long longValue, OutputStream out) throws IOException{
+    writeUnsignedVarLong((longValue << 1) ^ (longValue >> 63), out);
+  }
+
+  /**
    * @param bitLength a count of bits
    * @return the corresponding byte count padded to the next byte
    */

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
index 40190ee..cd9c6b2 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
@@ -125,6 +125,23 @@ abstract public class BytesInput {
   }
 
   /**
+   * @param longValue the long to write
+   * @return a BytesInput that will write var long
+   */
+  public static BytesInput fromUnsignedVarLong(long longValue) {
+    return new UnsignedVarLongBytesInput(longValue);
+  }
+
+  /**
+   *
+   * @param longValue the long to write
+   */
+  public static BytesInput fromZigZagVarLong(long longValue) {
+    long zigZag = (longValue << 1) ^ (longValue >> 63);
+    return new UnsignedVarLongBytesInput(zigZag);
+  }
+
+  /**
    * @param arrayOut
    * @return a BytesInput that will write the content of the buffer
    */
@@ -320,7 +337,27 @@ abstract public class BytesInput {
 
     @Override
     public long size() {
-      int s = 5 - ((Integer.numberOfLeadingZeros(intValue) + 3) / 7);
+      int s = (38 - Integer.numberOfLeadingZeros(intValue)) / 7;
+      return s == 0 ? 1 : s;
+    }
+  }
+
+  private static class UnsignedVarLongBytesInput extends BytesInput {
+
+    private final long longValue;
+
+    public UnsignedVarLongBytesInput(long longValue) {
+      this.longValue = longValue;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      BytesUtils.writeUnsignedVarLong(longValue, out);
+    }
+
+    @Override
+    public long size() {
+      int s = (70 - Long.numberOfLeadingZeros(longValue)) / 7;
       return s == 0 ? 1 : s;
     }
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLong.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLong.java b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLong.java
new file mode 100644
index 0000000..9859f5b
--- /dev/null
+++ b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLong.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Packs and unpacks INT64 into bytes
+ *
+ * packing unpacking treats: - n values at a time (with n % 8 == 0) - bitWidth * (n/8) bytes at a
+ * time.
+ *
+ * @author Vassil Lunchev
+ *
+ */
+public abstract class BytePackerForLong {
+
+  private final int bitWidth;
+
+  BytePackerForLong(int bitWidth) {
+    this.bitWidth = bitWidth;
+  }
+
+  /**
+   * @return the width in bits used for encoding, also how many bytes are packed/unpacked at a time
+   *         by pack8Values/unpack8Values
+   */
+  public final int getBitWidth() {
+    return bitWidth;
+  }
+
+  /**
+   * pack 8 values from input at inPos into bitWidth bytes in output at outPos. nextPosition: inPos
+   * += 8; outPos += getBitWidth()
+   * 
+   * @param input the input values
+   * @param inPos where to read from in input
+   * @param output the output bytes
+   * @param outPos where to write to in output
+   */
+  public abstract void pack8Values(final long[] input, final int inPos, final byte[] output,
+      final int outPos);
+
+  /**
+   * pack 32 values from input at inPos into bitWidth * 4 bytes in output at outPos. nextPosition:
+   * inPos += 32; outPos += getBitWidth() * 4
+   * 
+   * @param input the input values
+   * @param inPos where to read from in input
+   * @param output the output bytes
+   * @param outPos where to write to in output
+   */
+  public abstract void pack32Values(long[] input, int inPos, byte[] output, int outPos);
+
+  /**
+   * unpack bitWidth bytes from input at inPos into 8 values in output at outPos. nextPosition:
+   * inPos += getBitWidth(); outPos += 8
+   * 
+   * @param input the input bytes
+   * @param inPos where to read from in input
+   * @param output the output values
+   * @param outPos where to write to in output
+   */
+  public abstract void unpack8Values(final ByteBuffer input, final int inPos, final long[] output,
+      final int outPos);
+
+  /**
+   * unpack bitWidth * 4 bytes from input at inPos into 32 values in output at outPos. nextPosition:
+   * inPos += getBitWidth() * 4; outPos += 32
+   * 
+   * @param input the input bytes
+   * @param inPos where to read from in input
+   * @param output the output values
+   * @param outPos where to write to in output
+   */
+  public abstract void unpack32Values(ByteBuffer input, int inPos, long[] output, int outPos);
+
+  /**
+   * unpack bitWidth bytes from input at inPos into 8 values in output at outPos. nextPosition:
+   * inPos += getBitWidth(); outPos += 8
+   * 
+   * @param input the input bytes
+   * @param inPos where to read from in input
+   * @param output the output values
+   * @param outPos where to write to in output
+   */
+  public abstract void unpack8Values(final byte[] input, final int inPos, final long[] output,
+      final int outPos);
+
+  /**
+   * unpack bitWidth * 4 bytes from input at inPos into 32 values in output at outPos. nextPosition:
+   * inPos += getBitWidth() * 4; outPos += 32
+   * 
+   * @param input the input bytes
+   * @param inPos where to read from in input
+   * @param output the output values
+   * @param outPos where to write to in output
+   */
+  public abstract void unpack32Values(byte[] input, int inPos, long[] output, int outPos);
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLongFactory.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLongFactory.java b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLongFactory.java
new file mode 100644
index 0000000..39086ac
--- /dev/null
+++ b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLongFactory.java
@@ -0,0 +1,25 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+public interface BytePackerForLongFactory {
+
+  BytePackerForLong newBytePackerForLong(int width);
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java
index ed14edf..5c56941 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java
@@ -39,6 +39,10 @@ public enum Packer {
     public BytePacker newBytePacker(int width) {
       return beBytePackerFactory.newBytePacker(width);
     }
+    @Override
+    public BytePackerForLong newBytePackerForLong(int width) {
+      return beBytePackerForLongFactory.newBytePackerForLong(width);
+    }
   },
 
   /**
@@ -54,6 +58,10 @@ public enum Packer {
     public BytePacker newBytePacker(int width) {
       return leBytePackerFactory.newBytePacker(width);
     }
+    @Override
+    public BytePackerForLong newBytePackerForLong(int width) {
+      return leBytePackerForLongFactory.newBytePackerForLong(width);
+    }
   };
 
   private static IntPackerFactory getIntPackerFactory(String name) {
@@ -64,6 +72,10 @@ public enum Packer {
     return (BytePackerFactory)getStaticField("org.apache.parquet.column.values.bitpacking." + name, "factory");
   }
 
+  private static BytePackerForLongFactory getBytePackerForLongFactory(String name) {
+    return (BytePackerForLongFactory)getStaticField("org.apache.parquet.column.values.bitpacking." + name, "factory");
+  }
+
   private static Object getStaticField(String className, String fieldName) {
     try {
       return Class.forName(className).getField(fieldName).get(null);
@@ -80,10 +92,12 @@ public enum Packer {
     }
   }
 
-  static BytePackerFactory beBytePackerFactory = getBytePackerFactory("ByteBitPackingBE");
   static IntPackerFactory beIntPackerFactory = getIntPackerFactory("LemireBitPackingBE");
-  static BytePackerFactory leBytePackerFactory = getBytePackerFactory("ByteBitPackingLE");
   static IntPackerFactory leIntPackerFactory = getIntPackerFactory("LemireBitPackingLE");
+  static BytePackerFactory beBytePackerFactory = getBytePackerFactory("ByteBitPackingBE");
+  static BytePackerFactory leBytePackerFactory = getBytePackerFactory("ByteBitPackingLE");
+  static BytePackerForLongFactory beBytePackerForLongFactory = getBytePackerForLongFactory("ByteBitPackingForLongBE");
+  static BytePackerForLongFactory leBytePackerForLongFactory = getBytePackerForLongFactory("ByteBitPackingForLongLE");
 
   /**
    * @param width the width in bits of the packed values
@@ -96,4 +110,10 @@ public enum Packer {
    * @return a byte based packer
    */
   public abstract BytePacker newBytePacker(int width);
+
+  /**
+   * @param width the width in bits of the packed values
+   * @return a byte based packer for INT64
+   */
+  public abstract BytePackerForLong newBytePackerForLong(int width);
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPacking.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPacking.java b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPacking.java
index 64679e5..ce9b3ac 100644
--- a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPacking.java
+++ b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPacking.java
@@ -196,6 +196,20 @@ public class TestBitPacking {
     return sb.toString();
   }
 
+  public static String toString(long[] vals) {
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for (long i : vals) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(" ");
+      }
+      sb.append(i);
+    }
+    return sb.toString();
+  }
+
   public static String toString(byte[] bytes) {
     StringBuilder sb = new StringBuilder();
     boolean first = true;