You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2018/01/13 00:29:55 UTC

[3/3] parquet-mr git commit: PARQUET-1025: Support new min-max statistics in parquet-mr

PARQUET-1025: Support new min-max statistics in parquet-mr

Author: Gabor Szadovszky <ga...@cloudera.com>

Closes #435 from gszadovszky/PARQUET-1025 and squashes the following commits:

2a63fcf13 [Gabor Szadovszky] PARQUET-1025: Use constant instead of creating new TypeDefinedOrder instances
820df6fb7 [Gabor Szadovszky] PARQUET-1025: Minor fixes at data generation for TestStatistics
dc838f273 [Gabor Szadovszky] PARQUET-1025: Implement ColumnOrder; other updates for rdblue's findings
524750be0 [Gabor Szadovszky] PARQUET-1025: Some updates for zi's findings
a2ae97ce5 [Gabor Szadovszky] PARQUET-1025: Unified formatting/comments/deprecation
bc86e8a63 [Gabor Szadovszky] PARQUET-1025: Updates according to rdblue's comments
70e56a759 [Gabor Szadovszky] PARQUET-1025: Add explicit list of types to not to read/write statistics
95199e5e0 [Gabor Szadovszky] PARQUET-1025: Use lexicographical comparison for Binary.compareTo Also rename SIGNED_BINARY_COMPARATOR to a more descriptive name Also added comments for haxa representation of values at unsigned comparison testing
2f28c2c0e [Gabor Szadovszky] PARQUET-1025: Finalize read/write stats updates
c5536a0a3 [Gabor Szadovszky] PARQUET-1025: Some modifications according to zi's comments
318e585d9 [Gabor Szadovszky] PARQUET-1025: Finalize reading/writing new stats; modify/implement unit tests accordingly
688ef2efe [Gabor Szadovszky] PARQUET-1025: Updates according to zi's and rdblue's comments
51bc1f827 [Gabor Szadovszky] PARQUET-1025: Add the proper comparators as required; revert Binary related changes
20b937f46 [Gabor Szadovszky] PARQUET-1025: reading/writing new min-max statistics; use the comparators as needed
52cd58f61 [Gabor Szadovszky] PARQUET-1025: Move comparators to Type
3378b6d34 [Gabor Szadovszky] PARQUET-1025: Implement comparators and use them with statistics
e1719bb3b [Gabor Szadovszky] PARQUET-1025: Refactor Binary to prepare from custom comparators


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/c6764c4a
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/c6764c4a
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/c6764c4a

Branch: refs/heads/master
Commit: c6764c4a0848abf1d581e22df8b33e28ee9f2ced
Parents: 4d996d1
Author: Gabor Szadovszky <ga...@cloudera.com>
Authored: Fri Jan 12 16:29:48 2018 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Fri Jan 12 16:29:48 2018 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/parquet/cli/Util.java  |  50 +--
 .../cli/commands/CheckParquet251Command.java    |  11 +-
 .../apache/parquet/column/ColumnDescriptor.java |  40 ++-
 .../parquet/column/impl/ColumnWriterV1.java     |   2 +-
 .../parquet/column/impl/ColumnWriterV2.java     |   2 +-
 .../column/statistics/BinaryStatistics.java     |  49 ++-
 .../column/statistics/BooleanStatistics.java    |  53 ++-
 .../column/statistics/DoubleStatistics.java     |  56 ++-
 .../column/statistics/FloatStatistics.java      |  57 ++-
 .../column/statistics/IntStatistics.java        |  57 ++-
 .../column/statistics/LongStatistics.java       |  57 ++-
 .../parquet/column/statistics/Statistics.java   | 195 +++++++++--
 .../statistics/StatisticsClassException.java    |  14 +-
 .../parquet/filter2/predicate/Statistics.java   |  36 ++
 ...ntallyUpdatedFilterPredicateBuilderBase.java |  19 +
 .../org/apache/parquet/io/MessageColumnIO.java  |   2 +-
 .../apache/parquet/io/PrimitiveColumnIO.java    |   7 +-
 .../java/org/apache/parquet/io/api/Binary.java  | 120 +------
 .../org/apache/parquet/schema/ColumnOrder.java  |  97 +++++
 .../org/apache/parquet/schema/MessageType.java  |   6 +-
 .../parquet/schema/PrimitiveComparator.java     | 290 +++++++++++++++
 .../apache/parquet/schema/PrimitiveType.java    | 187 +++++++++-
 .../java/org/apache/parquet/schema/Types.java   |  20 +-
 .../column/statistics/TestStatistics.java       |  28 ++
 .../org/apache/parquet/io/api/TestBinary.java   |  20 ++
 .../apache/parquet/schema/TestMessageType.java  |  45 +++
 .../parquet/schema/TestPrimitiveComparator.java | 311 ++++++++++++++++
 .../apache/parquet/schema/TestTypeBuilders.java |  47 +++
 ...mentallyUpdatedFilterPredicateGenerator.java |  43 +--
 .../dictionarylevel/DictionaryFilter.java       |  15 +-
 .../statisticslevel/StatisticsFilter.java       |  17 +-
 .../converter/ParquetMetadataConverter.java     | 137 ++++++--
 .../hadoop/ColumnChunkPageWriteStore.java       |  20 +-
 .../parquet/hadoop/ParquetFileWriter.java       |  22 +-
 .../hadoop/metadata/ColumnChunkMetaData.java    |  39 ++-
 .../hadoop/metadata/ColumnChunkProperties.java  |  30 +-
 .../converter/TestParquetMetadataConverter.java | 351 +++++++++++++++++--
 .../parquet/hadoop/TestParquetFileWriter.java   |   8 +-
 .../org/apache/parquet/hadoop/TestUtils.java    |  21 ++
 .../apache/parquet/statistics/RandomValues.java |  97 ++++-
 .../parquet/statistics/TestStatistics.java      | 196 +++++++----
 .../thrift/TestThriftToParquetFileWriter.java   |  23 +-
 42 files changed, 2398 insertions(+), 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
index 07a5364..04b3901 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
@@ -29,10 +29,6 @@ import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.column.statistics.BinaryStatistics;
 import org.apache.parquet.column.statistics.BooleanStatistics;
-import org.apache.parquet.column.statistics.DoubleStatistics;
-import org.apache.parquet.column.statistics.FloatStatistics;
-import org.apache.parquet.column.statistics.IntStatistics;
-import org.apache.parquet.column.statistics.LongStatistics;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.MessageType;
@@ -40,7 +36,6 @@ import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 import java.nio.charset.StandardCharsets;
-import java.util.Locale;
 import java.util.Set;
 
 import static org.apache.parquet.column.Encoding.BIT_PACKED;
@@ -96,34 +91,14 @@ public class Util {
       return "";
     }
     // TODO: use original types when showing decimal, timestamp, etc.
-    if (stats instanceof BooleanStatistics) {
-      return String.format("%s / %s",
-          ((BooleanStatistics) stats).getMin(),
-          ((BooleanStatistics) stats).getMax());
-    } else if (stats instanceof IntStatistics) {
-      return String.format("%d / %d",
-          ((IntStatistics) stats).getMin(),
-          ((IntStatistics) stats).getMax());
-    } else if (stats instanceof LongStatistics) {
-      return String.format("%d / %d",
-          ((LongStatistics) stats).getMin(),
-          ((LongStatistics) stats).getMax());
-    } else if (stats instanceof FloatStatistics) {
-      return String.format("%f / %f",
-          ((FloatStatistics) stats).getMin(),
-          ((FloatStatistics) stats).getMax());
-    } else if (stats instanceof DoubleStatistics) {
-      return String.format("%f / %f",
-          ((DoubleStatistics) stats).getMin(),
-          ((DoubleStatistics) stats).getMax());
-    } else if (stats instanceof BinaryStatistics) {
+    if (stats instanceof BinaryStatistics) {
       byte[] minBytes = stats.getMinBytes();
       byte[] maxBytes = stats.getMaxBytes();
       return String.format("%s / %s",
           printable(minBytes, annotation == OriginalType.UTF8, 30),
           printable(maxBytes, annotation == OriginalType.UTF8, 30));
     } else {
-      throw new RuntimeException("Unknown stats type: " + stats);
+      return String.format("%s / %s", stats.minAsString(), stats.maxAsString());
     }
   }
 
@@ -134,24 +109,6 @@ public class Util {
     // TODO: use original types when showing decimal, timestamp, etc.
     if (stats instanceof BooleanStatistics) {
       return String.format("nulls: %d/%d", stats.getNumNulls(), count);
-    } else if (stats instanceof IntStatistics) {
-      return String.format("min: %d max: %d nulls: %d/%d",
-          ((IntStatistics) stats).getMin(), ((IntStatistics) stats).getMax(),
-          stats.getNumNulls(), count);
-    } else if (stats instanceof LongStatistics) {
-      return String.format("min: %d max: %d nulls: %d/%d",
-          ((LongStatistics) stats).getMin(), ((LongStatistics) stats).getMax(),
-          stats.getNumNulls(), count);
-    } else if (stats instanceof FloatStatistics) {
-      return String.format("min: %f max: %f nulls: %d/%d",
-          ((FloatStatistics) stats).getMin(),
-          ((FloatStatistics) stats).getMax(),
-          stats.getNumNulls(), count);
-    } else if (stats instanceof DoubleStatistics) {
-      return String.format("min: %f max: %f nulls: %d/%d",
-          ((DoubleStatistics) stats).getMin(),
-          ((DoubleStatistics) stats).getMax(),
-          stats.getNumNulls(), count);
     } else if (stats instanceof BinaryStatistics) {
       byte[] minBytes = stats.getMinBytes();
       byte[] maxBytes = stats.getMaxBytes();
@@ -160,7 +117,8 @@ public class Util {
           printable(maxBytes, annotation == OriginalType.UTF8, 30),
           stats.getNumNulls(), count);
     } else {
-      throw new RuntimeException("Unknown stats type: " + stats);
+      return String.format("min: %s max: %s nulls: %d/%d",
+        stats.minAsString(), stats.maxAsString(), stats.getNumNulls(), count);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
index 8f60821..fbeebdf 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
@@ -53,6 +53,7 @@ import org.slf4j.Logger;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
@@ -184,9 +185,11 @@ public class CheckParquet251Command extends BaseCommand {
     private final boolean hasNonNull;
     private final T min;
     private final T max;
+    private final Comparator<T> comparator;
 
     public StatsValidator(DataPage page) {
       Statistics<T> stats = getStatisticsFromPageHeader(page);
+      this.comparator = stats.comparator();
       this.hasNonNull = stats.hasNonNullValue();
       if (hasNonNull) {
         this.min = stats.genericGetMin();
@@ -199,10 +202,10 @@ public class CheckParquet251Command extends BaseCommand {
 
     public void validate(T value) {
       if (hasNonNull) {
-        if (min.compareTo(value) > 0) {
+        if (comparator.compare(min, value) > 0) {
           throw new BadStatsException("Min should be <= all values.");
         }
-        if (max.compareTo(value) < 0) {
+        if (comparator.compare(max, value) < 0) {
           throw new BadStatsException("Max should be >= all values.");
         }
       }
@@ -343,8 +346,8 @@ public class CheckParquet251Command extends BaseCommand {
 
       console.debug(String.format(
           "Validated stats min=%s max=%s nulls=%d for page=%s col=%s",
-          String.valueOf(stats.genericGetMin()),
-          String.valueOf(stats.genericGetMax()), stats.getNumNulls(), page,
+          stats.minAsString(),
+          stats.maxAsString(), stats.getNumNulls(), page,
           Arrays.toString(desc.getPath())));
     }
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java
index 61f13a2..5f30cd0 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java
@@ -20,7 +20,9 @@ package org.apache.parquet.column;
 
 import java.util.Arrays;
 
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
 
 /**
  * Describes a column's type as well as its position in its containing schema.
@@ -31,8 +33,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 public class ColumnDescriptor implements Comparable<ColumnDescriptor> {
 
   private final String[] path;
-  private final PrimitiveTypeName type;
-  private final int typeLength;
+  private final PrimitiveType type;
   private final int maxRep;
   private final int maxDef;
 
@@ -42,8 +43,10 @@ public class ColumnDescriptor implements Comparable<ColumnDescriptor> {
    * @param type the type of the field
    * @param maxRep the maximum repetition level for that path
    * @param maxDef the maximum definition level for that path
+   * @deprecated Use {@link #ColumnDescriptor(String[], PrimitiveTypeName, int, int)}
    */
-  public ColumnDescriptor(String[] path, PrimitiveTypeName type, int maxRep, 
+  @Deprecated
+  public ColumnDescriptor(String[] path, PrimitiveTypeName type, int maxRep,
                           int maxDef) {
     this(path, type, 0, maxRep, maxDef);
   }
@@ -54,13 +57,23 @@ public class ColumnDescriptor implements Comparable<ColumnDescriptor> {
    * @param type the type of the field
    * @param maxRep the maximum repetition level for that path
    * @param maxDef the maximum definition level for that path
+   * @deprecated Use {@link #ColumnDescriptor(String[], PrimitiveTypeName, int, int)}
    */
-  public ColumnDescriptor(String[] path, PrimitiveTypeName type, 
+  @Deprecated
+  public ColumnDescriptor(String[] path, PrimitiveTypeName type,
                           int typeLength, int maxRep, int maxDef) {
-    super();
+    this(path, new PrimitiveType(Type.Repetition.OPTIONAL, type, typeLength,""), maxRep, maxDef);
+  }
+
+  /**
+   * @param path the path to the leaf field in the schema
+   * @param type the type of the field
+   * @param maxRep the maximum repetition level for that path
+   * @param maxDef the maximum definition level for that path
+   */
+  public ColumnDescriptor(String[] path, PrimitiveType type, int maxRep, int maxDef) {
     this.path = path;
     this.type = type;
-    this.typeLength = typeLength;
     this.maxRep = maxRep;
     this.maxDef = maxDef;
   }
@@ -88,16 +101,27 @@ public class ColumnDescriptor implements Comparable<ColumnDescriptor> {
 
   /**
    * @return the type of that column
+   * @deprecated will removed in 2.0.0. Use {@link #getPrimitiveType()} instead.
    */
+  @Deprecated
   public PrimitiveTypeName getType() {
-    return type;
+    return type.getPrimitiveTypeName();
   }
 
   /**
    * @return the size of the type
+   * @deprecated will removed in 2.0.0. Use {@link #getPrimitiveType()} instead.
    **/
+  @Deprecated
   public int getTypeLength() {
-    return typeLength;
+    return type.getTypeLength();
+  }
+
+  /**
+   * @return the primitive type object of the column
+   */
+  public PrimitiveType getPrimitiveType() {
+    return type;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
index c5b3884..e274c11 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
@@ -80,7 +80,7 @@ final class ColumnWriterV1 implements ColumnWriter {
   }
 
   private void resetStatistics() {
-    this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
+    this.statistics = Statistics.createStats(this.path.getPrimitiveType());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
index c6fd91b..b50d663 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
@@ -77,7 +77,7 @@ final class ColumnWriterV2 implements ColumnWriter {
   }
 
   private void resetStatistics() {
-    this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
+    this.statistics = Statistics.createStats(path.getPrimitiveType());
   }
 
   private void definitionLevel(int definitionLevel) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
index c319b4a..a68285b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
@@ -19,12 +19,38 @@
 package org.apache.parquet.column.statistics;
 
 import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
 
 public class BinaryStatistics extends Statistics<Binary> {
 
+  // A fake type object to be used to generate the proper comparator
+  private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
+      .named("fake_binary_type");
+
   private Binary max;
   private Binary min;
 
+  /**
+   * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead
+   */
+  @Deprecated
+  public BinaryStatistics() {
+    this(DEFAULT_FAKE_TYPE);
+  }
+
+  BinaryStatistics(PrimitiveType type) {
+    super(type);
+  }
+
+  private BinaryStatistics(BinaryStatistics other) {
+    super(other.type());
+    if (other.hasNonNullValue()) {
+      initializeStats(other.min, other.max);
+    }
+    setNumNulls(other.getNumNulls());
+  }
+
   @Override
   public void updateStats(Binary value) {
     if (!this.hasNonNullValue()) {
@@ -68,18 +94,14 @@ public class BinaryStatistics extends Statistics<Binary> {
   }
 
   @Override
-  public boolean isSmallerThan(long size) {
-    return !hasNonNullValue() || ((min.length() + max.length()) < size);
+  String toString(Binary value) {
+    // TODO: have separate toString for different logical types?
+    return value == null ? "null" : value.toStringUsingUTF8();
   }
 
   @Override
-  public String toString() {
-    if (this.hasNonNullValue())
-      return String.format("min: %s, max: %s, num_nulls: %d", min.toStringUsingUTF8(), max.toStringUsingUTF8(), this.getNumNulls());
-   else if (!this.isEmpty())
-      return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
-   else
-      return "no stats for this column";
+  public boolean isSmallerThan(long size) {
+    return !hasNonNullValue() || ((min.length() + max.length()) < size);
   }
 
   /**
@@ -87,8 +109,8 @@ public class BinaryStatistics extends Statistics<Binary> {
    */
   @Deprecated
   public void updateStats(Binary min_value, Binary max_value) {
-    if (min.compareTo(min_value) > 0) { min = min_value.copy(); }
-    if (max.compareTo(max_value) < 0) { max = max_value.copy(); }
+    if (comparator().compare(min, min_value) > 0) { min = min_value.copy(); }
+    if (comparator().compare(max, max_value) < 0) { max = max_value.copy(); }
   }
 
   /**
@@ -136,4 +158,9 @@ public class BinaryStatistics extends Statistics<Binary> {
     this.min = min;
     this.markAsNotEmpty();
   }
+
+  @Override
+  public BinaryStatistics copy() {
+    return new BinaryStatistics(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java
index 22c2393..0e77b61 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java
@@ -19,12 +19,38 @@
 package org.apache.parquet.column.statistics;
 
 import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
 
 public class BooleanStatistics extends Statistics<Boolean> {
 
+  // A fake type object to be used to generate the proper comparator
+  private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.BOOLEAN)
+      .named("fake_boolean_type");
+
   private boolean max;
   private boolean min;
 
+  /**
+   * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead
+   */
+  @Deprecated
+  public BooleanStatistics() {
+    this(DEFAULT_FAKE_TYPE);
+  }
+
+  BooleanStatistics(PrimitiveType type) {
+    super(type);
+  }
+
+  private BooleanStatistics(BooleanStatistics other) {
+    super(other.type());
+    if (other.hasNonNullValue()) {
+      initializeStats(other.min, other.max);
+    }
+    setNumNulls(other.getNumNulls());
+  }
+
   @Override
   public void updateStats(boolean value) {
     if (!this.hasNonNullValue()) {
@@ -66,19 +92,9 @@ public class BooleanStatistics extends Statistics<Boolean> {
     return !hasNonNullValue() || (2 < size);
   }
 
-  @Override
-  public String toString() {
-    if (this.hasNonNullValue())
-      return String.format("min: %b, max: %b, num_nulls: %d", min, max, this.getNumNulls());
-    else if(!this.isEmpty())
-      return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
-    else  
-      return "no stats for this column";
-  }
-
   public void updateStats(boolean min_value, boolean max_value) {
-    if (min && !min_value) { min = min_value; }
-    if (!max && max_value) { max = max_value; }
+    if (comparator().compare(min, min_value) > 0) { min = min_value; }
+    if (comparator().compare(max, max_value) < 0) { max = max_value; }
   }
 
   public void initializeStats(boolean min_value, boolean max_value) {
@@ -97,6 +113,14 @@ public class BooleanStatistics extends Statistics<Boolean> {
     return max;
   }
 
+  public int compareMinToValue(boolean value) {
+    return comparator().compare(min, value);
+  }
+
+  public int compareMaxToValue(boolean value) {
+    return comparator().compare(max, value);
+  }
+
   public boolean getMax() {
     return max;
   }
@@ -110,4 +134,9 @@ public class BooleanStatistics extends Statistics<Boolean> {
     this.min = min;
     this.markAsNotEmpty();
   }
+
+  @Override
+  public BooleanStatistics copy() {
+    return new BooleanStatistics(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java
index d67a550..0dd067b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java
@@ -19,12 +19,38 @@
 package org.apache.parquet.column.statistics;
 
 import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
 
 public class DoubleStatistics extends Statistics<Double> {
 
+  // A fake type object to be used to generate the proper comparator
+  private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+      .named("fake_double_type");
+
   private double max;
   private double min;
 
+  /**
+   * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead
+   */
+  @Deprecated
+  public DoubleStatistics() {
+    this(DEFAULT_FAKE_TYPE);
+  }
+
+  DoubleStatistics(PrimitiveType type) {
+    super(type);
+  }
+
+  private DoubleStatistics(DoubleStatistics other) {
+    super(other.type());
+    if (other.hasNonNullValue()) {
+      initializeStats(other.min, other.max);
+    }
+    setNumNulls(other.getNumNulls());
+  }
+
   @Override
   public void updateStats(double value) {
     if (!this.hasNonNullValue()) {
@@ -62,23 +88,18 @@ public class DoubleStatistics extends Statistics<Double> {
   }
 
   @Override
-  public boolean isSmallerThan(long size) {
-    return !hasNonNullValue() || (16 < size);
+  String toString(Double value) {
+    return String.format("%.5f", value);
   }
 
   @Override
-  public String toString() {
-    if(this.hasNonNullValue())
-      return String.format("min: %.5f, max: %.5f, num_nulls: %d", min, max, this.getNumNulls());
-    else if (!this.isEmpty())
-      return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
-    else
-      return "no stats for this column";
+  public boolean isSmallerThan(long size) {
+    return !hasNonNullValue() || (16 < size);
   }
 
   public void updateStats(double min_value, double max_value) {
-    if (min_value < min) { min = min_value; }
-    if (max_value > max) { max = max_value; }
+    if (comparator().compare(min, min_value) > 0) { min = min_value; }
+    if (comparator().compare(max, max_value) < 0) { max = max_value; }
   }
 
   public void initializeStats(double min_value, double max_value) {
@@ -97,6 +118,14 @@ public class DoubleStatistics extends Statistics<Double> {
     return max;
   }
 
+  public int compareMinToValue(double value) {
+    return comparator().compare(min, value);
+  }
+
+  public int compareMaxToValue(double value) {
+    return comparator().compare(max, value);
+  }
+
   public double getMax() {
     return max;
   }
@@ -110,4 +139,9 @@ public class DoubleStatistics extends Statistics<Double> {
     this.min = min;
     this.markAsNotEmpty();
   }
+
+  @Override
+  public DoubleStatistics copy() {
+    return new DoubleStatistics(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java
index dffc207..36836c6 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java
@@ -19,12 +19,39 @@
 package org.apache.parquet.column.statistics;
 
 import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
 
 public class FloatStatistics extends Statistics<Float> {
 
+  // A fake type object to be used to generate the proper comparator
+  private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.FLOAT)
+      .named("fake_float_type");
+
   private float max;
   private float min;
 
+  /**
+   * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead
+   */
+  @Deprecated
+  public FloatStatistics() {
+    // Creating a fake primitive type to have the proper comparator
+    this(DEFAULT_FAKE_TYPE);
+  }
+
+  FloatStatistics(PrimitiveType type) {
+    super(type);
+  }
+
+  private FloatStatistics(FloatStatistics other) {
+    super(other.type());
+    if (other.hasNonNullValue()) {
+      initializeStats(other.min, other.max);
+    }
+    setNumNulls(other.getNumNulls());
+  }
+
   @Override
   public void updateStats(float value) {
     if (!this.hasNonNullValue()) {
@@ -62,23 +89,18 @@ public class FloatStatistics extends Statistics<Float> {
   }
 
   @Override
-  public boolean isSmallerThan(long size) {
-    return !hasNonNullValue() || (8 < size);
+  String toString(Float value) {
+    return String.format("%.5f", value);
   }
 
   @Override
-  public String toString() {
-    if (this.hasNonNullValue())
-      return String.format("min: %.5f, max: %.5f, num_nulls: %d", min, max, this.getNumNulls());
-    else if (!this.isEmpty())
-      return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
-    else
-      return "no stats for this column";
+  public boolean isSmallerThan(long size) {
+    return !hasNonNullValue() || (8 < size);
   }
 
   public void updateStats(float min_value, float max_value) {
-    if (min_value < min) { min = min_value; }
-    if (max_value > max) { max = max_value; }
+    if (comparator().compare(min, min_value) > 0) { min = min_value; }
+    if (comparator().compare(max, max_value) < 0) { max = max_value; }
   }
 
   public void initializeStats(float min_value, float max_value) {
@@ -97,6 +119,14 @@ public class FloatStatistics extends Statistics<Float> {
     return max;
   }
 
+  public int compareMinToValue(float value) {
+    return comparator().compare(min, value);
+  }
+
+  public int compareMaxToValue(float value) {
+    return comparator().compare(max, value);
+  }
+
   public float getMax() {
     return max;
   }
@@ -110,4 +140,9 @@ public class FloatStatistics extends Statistics<Float> {
     this.min = min;
     this.markAsNotEmpty();
   }
+
+  @Override
+  public FloatStatistics copy() {
+    return new FloatStatistics(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java
index a5d7ba1..5df7f0a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java
@@ -19,12 +19,38 @@
 package org.apache.parquet.column.statistics;
 
 import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
 
 public class IntStatistics extends Statistics<Integer> {
 
+  // A fake type object to be used to generate the proper comparator
+  private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.INT32)
+      .named("fake_int32_type");
+
   private int max;
   private int min;
 
+  /**
+   * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead
+   */
+  @Deprecated
+  public IntStatistics() {
+    this(DEFAULT_FAKE_TYPE);
+  }
+
+  IntStatistics(PrimitiveType type) {
+    super(type);
+  }
+
+  private IntStatistics(IntStatistics other) {
+    super(other.type());
+    if (other.hasNonNullValue()) {
+      initializeStats(other.min, other.max);
+    }
+    setNumNulls(other.getNumNulls());
+  }
+
   @Override
   public void updateStats(int value) {
     if (!this.hasNonNullValue()) {
@@ -62,23 +88,19 @@ public class IntStatistics extends Statistics<Integer> {
   }
 
   @Override
-  public boolean isSmallerThan(long size) {
-    return !hasNonNullValue() || (8 < size);
+  String toString(Integer value) {
+    // TODO: implement unsigned int as required
+    return value.toString();
   }
 
   @Override
-  public String toString() {
-    if (this.hasNonNullValue())
-      return String.format("min: %d, max: %d, num_nulls: %d", min, max, this.getNumNulls());
-    else if (!this.isEmpty())
-      return String.format("num_nulls: %d, min/max is not defined", this.getNumNulls());
-    else
-      return "no stats for this column";
+  public boolean isSmallerThan(long size) {
+    return !hasNonNullValue() || (8 < size);
   }
 
   public void updateStats(int min_value, int max_value) {
-    if (min_value < min) { min = min_value; }
-    if (max_value > max) { max = max_value; }
+    if (comparator().compare(min, min_value) > 0) { min = min_value; }
+    if (comparator().compare(max, max_value) < 0) { max = max_value; }
   }
 
   public void initializeStats(int min_value, int max_value) {
@@ -97,6 +119,14 @@ public class IntStatistics extends Statistics<Integer> {
     return max;
   }
 
+  public int compareMinToValue(int value) {
+    return comparator().compare(min, value);
+  }
+
+  public int compareMaxToValue(int value) {
+    return comparator().compare(max, value);
+  }
+
   public int getMax() {
     return max;
   }
@@ -110,4 +140,9 @@ public class IntStatistics extends Statistics<Integer> {
     this.min = min;
     this.markAsNotEmpty();
   }
+
+  @Override
+  public IntStatistics copy() {
+    return new IntStatistics(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java
index f7971ef..fd6d19c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java
@@ -19,12 +19,38 @@
 package org.apache.parquet.column.statistics;
 
 import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
 
 public class LongStatistics extends Statistics<Long> {
 
+  // A fake type object to be used to generate the proper comparator
+  private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.INT64)
+      .named("fake_int64_type");
+
   private long max;
   private long min;
 
+  /**
+   * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead
+   */
+  @Deprecated
+  public LongStatistics() {
+    this(DEFAULT_FAKE_TYPE);
+  }
+
+  LongStatistics(PrimitiveType type) {
+    super(type);
+  }
+
+  private LongStatistics(LongStatistics other) {
+    super(other.type());
+    if (other.hasNonNullValue()) {
+      initializeStats(other.min, other.max);
+    }
+    setNumNulls(other.getNumNulls());
+  }
+
   @Override
   public void updateStats(long value) {
     if (!this.hasNonNullValue()) {
@@ -62,23 +88,19 @@ public class LongStatistics extends Statistics<Long> {
   }
 
   @Override
-  public boolean isSmallerThan(long size) {
-    return !hasNonNullValue() || (16 < size);
+  String toString(Long value) {
+    // TODO: implement unsigned int as required
+    return value.toString();
   }
 
   @Override
-  public String toString() {
-    if (this.hasNonNullValue())
-      return String.format("min: %d, max: %d, num_nulls: %d", min, max, this.getNumNulls());
-    else if (!this.isEmpty())
-      return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
-    else
-      return "no stats for this column";
+  public boolean isSmallerThan(long size) {
+    return !hasNonNullValue() || (16 < size);
   }
 
   public void updateStats(long min_value, long max_value) {
-    if (min_value < min) { min = min_value; }
-    if (max_value > max) { max = max_value; }
+    if (comparator().compare(min, min_value) > 0) { min = min_value; }
+    if (comparator().compare(max, max_value) < 0) { max = max_value; }
   }
 
   public void initializeStats(long min_value, long max_value) {
@@ -97,6 +119,14 @@ public class LongStatistics extends Statistics<Long> {
     return max;
   }
 
+  public int compareMinToValue(long value) {
+    return comparator().compare(min, value);
+  }
+
+  public int compareMaxToValue(long value) {
+    return comparator().compare(max, value);
+  }
+
   public long getMax() {
     return max;
   }
@@ -110,4 +140,9 @@ public class LongStatistics extends Statistics<Long> {
     this.min = min;
     this.markAsNotEmpty();
   }
+
+  @Override
+  public LongStatistics copy() {
+    return new LongStatistics(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java
index 30153c0..6eb2381 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java
@@ -18,10 +18,15 @@
  */
 package org.apache.parquet.column.statistics;
 
+import java.util.Arrays;
+import java.util.Objects;
+
 import org.apache.parquet.column.UnknownColumnTypeException;
 import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
-import java.util.Arrays;
+import org.apache.parquet.schema.Type;
 
 
 /**
@@ -31,10 +36,14 @@ import java.util.Arrays;
  */
 public abstract class Statistics<T extends Comparable<T>> {
 
+  private final PrimitiveType type;
+  private final PrimitiveComparator<T> comparator;
   private boolean hasNonNullValue;
   private long num_nulls;
 
-  public Statistics() {
+  Statistics(PrimitiveType type) {
+    this.type = type;
+    this.comparator = type.comparator();
     hasNonNullValue = false;
     num_nulls = 0;
   }
@@ -43,27 +52,59 @@ public abstract class Statistics<T extends Comparable<T>> {
    * Returns the typed statistics object based on the passed type parameter
    * @param type PrimitiveTypeName type of the column
    * @return instance of a typed statistics class
+   * @deprecated Use {@link #createStats(Type)} instead
    */
+  @Deprecated
   public static Statistics getStatsBasedOnType(PrimitiveTypeName type) {
-    switch(type) {
-    case INT32:
-      return new IntStatistics();
-    case INT64:
-      return new LongStatistics();
-    case FLOAT:
-      return new FloatStatistics();
-    case DOUBLE:
-      return new DoubleStatistics();
-    case BOOLEAN:
-      return new BooleanStatistics();
-    case BINARY:
-      return new BinaryStatistics();
-    case INT96:
-      return new BinaryStatistics();
-    case FIXED_LEN_BYTE_ARRAY:
-      return new BinaryStatistics();
-    default:
-      throw new UnknownColumnTypeException(type);
+    switch (type) {
+      case INT32:
+        return new IntStatistics();
+      case INT64:
+        return new LongStatistics();
+      case FLOAT:
+        return new FloatStatistics();
+      case DOUBLE:
+        return new DoubleStatistics();
+      case BOOLEAN:
+        return new BooleanStatistics();
+      case BINARY:
+        return new BinaryStatistics();
+      case INT96:
+        return new BinaryStatistics();
+      case FIXED_LEN_BYTE_ARRAY:
+        return new BinaryStatistics();
+      default:
+        throw new UnknownColumnTypeException(type);
+    }
+  }
+
+  /**
+   * Creates an empty {@code Statistics} instance for the specified type to be
+   * used for reading/writing the new min/max statistics used in the V2 format.
+   *
+   * @param type
+   *          type of the column
+   * @return instance of a typed statistics class
+   */
+  public static Statistics<?> createStats(Type type) {
+    PrimitiveType primitive = type.asPrimitiveType();
+    switch (primitive.getPrimitiveTypeName()) {
+      case INT32:
+        return new IntStatistics(primitive);
+      case INT64:
+        return new LongStatistics(primitive);
+      case FLOAT:
+        return new FloatStatistics(primitive);
+      case DOUBLE:
+        return new DoubleStatistics(primitive);
+      case BOOLEAN:
+        return new BooleanStatistics(primitive);
+      case BINARY:
+      case INT96:
+      case FIXED_LEN_BYTE_ARRAY:
+        return new BinaryStatistics(primitive);
+      default:
+        throw new UnknownColumnTypeException(primitive.getPrimitiveTypeName());
     }
   }
 
@@ -127,9 +168,10 @@ public abstract class Statistics<T extends Comparable<T>> {
     if (!(other instanceof Statistics))
       return false;
     Statistics stats = (Statistics) other;
-    return Arrays.equals(stats.getMaxBytes(), this.getMaxBytes()) &&
-            Arrays.equals(stats.getMinBytes(), this.getMinBytes()) &&
-            stats.getNumNulls() == this.getNumNulls();
+    return type.equals(stats.type) &&
+        Arrays.equals(stats.getMaxBytes(), this.getMaxBytes()) &&
+        Arrays.equals(stats.getMinBytes(), this.getMinBytes()) &&
+        stats.getNumNulls() == this.getNumNulls();
   }
 
   /**
@@ -138,7 +180,8 @@ public abstract class Statistics<T extends Comparable<T>> {
    */
   @Override
   public int hashCode() {
-    return 31 * Arrays.hashCode(getMaxBytes()) + 17 * Arrays.hashCode(getMinBytes()) + Long.valueOf(this.getNumNulls()).hashCode();
+    return 31 * type.hashCode() + 31 * Arrays.hashCode(getMaxBytes()) + 17 * Arrays.hashCode(getMinBytes())
+        + Long.valueOf(this.getNumNulls()).hashCode();
   }
 
   /**
@@ -150,14 +193,15 @@ public abstract class Statistics<T extends Comparable<T>> {
   public void mergeStatistics(Statistics stats) {
     if (stats.isEmpty()) return;
 
-    if (this.getClass() == stats.getClass()) {
+    // Merge stats only if they have the same type
+    if (type.equals(stats.type)) {
       incrementNumNulls(stats.getNumNulls());
       if (stats.hasNonNullValue()) {
         mergeStatisticsMinMax(stats);
         markAsNotEmpty();
       }
     } else {
-      throw new StatisticsClassException(this.getClass().toString(), stats.getClass().toString());
+      throw StatisticsClassException.create(this, stats);
     }
   }
 
@@ -175,10 +219,59 @@ public abstract class Statistics<T extends Comparable<T>> {
    */
   abstract public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes);
 
+  /**
+   * Returns the min value in the statistics. The java natural order of the returned type defined by {@link
+   * T#compareTo(Object)} might not be the proper one. For example, UINT_32 requires unsigned comparison instead of the
+   * natural signed one. Use {@link #compareMinToValue(Comparable)} or the comparator returned by {@link #comparator()} to
+   * always get the proper ordering.
+   */
   abstract public T genericGetMin();
+
+  /**
+   * Returns the max value in the statistics. The java natural order of the returned type defined by {@link
+   * T#compareTo(Object)} might not be the proper one. For example, UINT_32 requires unsigned comparison instead of the
+   * natural signed one. Use {@link #compareMaxToValue(Comparable)} or the comparator returned by {@link #comparator()} to
+   * always get the proper ordering.
+   */
   abstract public T genericGetMax();
 
   /**
+   * Returns the {@link PrimitiveComparator} implementation to be used to compare two generic values in the proper way
+   * (for example, unsigned comparison for UINT_32).
+   */
+  public final PrimitiveComparator<T> comparator() {
+    return comparator;
+  }
+
+  /**
+   * Compares min to the specified value in the proper way. It does the same as invoking
+   * {@code comparator().compare(genericGetMin(), value)}. The corresponding statistics implementations overload this
+   * method so the one with the primitive argument shall be used to avoid boxing/unboxing.
+   *
+   * @param value
+   *          the value which {@code min} is to be compared to
+   * @return a negative integer, zero, or a positive integer as {@code min} is less than, equal to, or greater than
+   *         {@code value}.
+   */
+  public final int compareMinToValue(T value) {
+    return comparator.compare(genericGetMin(), value);
+  }
+
+  /**
+   * Compares max to the specified value in the proper way. It does the same as invoking
+   * {@code comparator().compare(genericGetMax(), value)}. The corresponding statistics implementations overload this
+   * method so the one with the primitive argument shall be used to avoid boxing/unboxing.
+   *
+   * @param value
+   *          the value which {@code max} is to be compared to
+   * @return a negative integer, zero, or a positive integer as {@code max} is less than, equal to, or greater than
+   *         {@code value}.
+   */
+  public final int compareMaxToValue(T value) {
+    return comparator.compare(genericGetMax(), value);
+  }
+
+  /**
    * Abstract method to return the max value as a byte array
    * @return byte array corresponding to the max value
    */
@@ -191,6 +284,24 @@ public abstract class Statistics<T extends Comparable<T>> {
   abstract public byte[] getMinBytes();
 
   /**
+   * Returns the string representation of min for debugging/logging purposes.
+   */
+  public String minAsString() {
+    return toString(genericGetMin());
+  }
+
+  /**
+   * Returns the string representation of max for debugging/logging purposes.
+   */
+  public String maxAsString() {
+    return toString(genericGetMax());
+  }
+
+  String toString(T value) {
+    return Objects.toString(value);
+  }
+
+  /**
    * Abstract method to return whether the min and max values fit in the given
    * size.
    * @param size a size in bytes
@@ -198,11 +309,15 @@ public abstract class Statistics<T extends Comparable<T>> {
    */
   abstract public boolean isSmallerThan(long size);
 
-  /**
-   * toString() to display min, max, num_nulls in a string
-   */
-  abstract public String toString();
-
+  @Override
+  public String toString() {
+    if (this.hasNonNullValue())
+      return String.format("min: %s, max: %s, num_nulls: %d", minAsString(), maxAsString(), this.getNumNulls());
+    else if (!this.isEmpty())
+      return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
+    else
+      return "no stats for this column";
+  }
 
   /**
    * Increments the null count by one
@@ -250,13 +365,25 @@ public abstract class Statistics<T extends Comparable<T>> {
   public boolean hasNonNullValue() {
     return hasNonNullValue;
   }
- 
+
   /**
    * Sets the page/column as having a valid non-null value
    * kind of misnomer here
-   */ 
+   */
   protected void markAsNotEmpty() {
     hasNonNullValue = true;
   }
+
+  /**
+   * @return a new independent statistics instance of this class.
+   */
+  public abstract Statistics<T> copy();
+
+  /**
+   * @return the primitive type object which this statistics is created for
+   */
+  public PrimitiveType type() {
+    return type;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java
index a242737..4c23101 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java
@@ -29,6 +29,18 @@ public class StatisticsClassException extends ParquetRuntimeException {
   private static final long serialVersionUID = 1L;
 
   public StatisticsClassException(String className1, String className2) {
-    super("Statistics classes mismatched: " + className1 + " vs. " + className2);
+    this("Statistics classes mismatched: " + className1 + " vs. " + className2);
+  }
+
+  private StatisticsClassException(String msg) {
+    super(msg);
+  }
+
+  static StatisticsClassException create(Statistics<?> stats1, Statistics<?> stats2) {
+    if (stats1.getClass() != stats2.getClass()) {
+      return new StatisticsClassException(stats1.getClass().toString(), stats2.getClass().toString());
+    }
+    return new StatisticsClassException(
+        "Statistics comparator mismatched: " + stats1.comparator() + " vs. " + stats2.comparator());
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java
index 22e4027..8df0250 100644
--- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java
@@ -18,6 +18,8 @@
  */
 package org.apache.parquet.filter2.predicate;
 
+import java.util.Comparator;
+
 import static org.apache.parquet.Preconditions.checkNotNull;
 
 /**
@@ -26,17 +28,51 @@ import static org.apache.parquet.Preconditions.checkNotNull;
 public class Statistics<T> {
   private final T min;
   private final T max;
+  private final Comparator<T> comparator;
 
+  // Intended for use only within Parquet itself.
+  /**
+   * @deprecated will be removed in 2.0.0. Use {@link #Statistics(Object, Object, Comparator)} instead
+   */
+  @Deprecated
   public Statistics(T min, T max) {
     this.min = checkNotNull(min, "min");
     this.max = checkNotNull(max, "max");
+    this.comparator = null;
   }
 
+  // Intended for use only within Parquet itself.
+  public Statistics(T min, T max, Comparator<T> comparator) {
+    this.min = checkNotNull(min, "min");
+    this.max = checkNotNull(max, "max");
+    this.comparator = checkNotNull(comparator, "comparator");
+  }
+
+  /**
+   * Returns the generic object representing the min value in the statistics. The
+   * natural ordering of type {@code T} defined by the {@code compareTo} method
+   * might not be appropriate for the actual logical type. Use
+   * {@link #getComparator()} for comparing.
+   */
   public T getMin() {
     return min;
   }
 
+  /**
+   * Returns the generic object representing the max value in the statistics. The
+   * natural ordering of type {@code T} defined by the {@code compareTo} method
+   * might not be appropriate for the actual logical type. Use
+   * {@link #getComparator()} for comparing.
+   */
   public T getMax() {
     return max;
   }
+
+  /**
+   * Returns the comparator to be used to compare two generic values in the proper way (e.g. unsigned comparison for
+   * UINT_32)
+   */
+  public Comparator<T> getComparator() {
+    return comparator;
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
index 8def88e..c1f759c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
@@ -30,6 +31,8 @@ import org.apache.parquet.filter2.predicate.Operators.And;
 import org.apache.parquet.filter2.predicate.Operators.Not;
 import org.apache.parquet.filter2.predicate.Operators.Or;
 import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import org.apache.parquet.io.PrimitiveColumnIO;
+import org.apache.parquet.schema.PrimitiveComparator;
 
 import static org.apache.parquet.Preconditions.checkArgument;
 
@@ -55,9 +58,20 @@ import static org.apache.parquet.Preconditions.checkArgument;
 public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor<IncrementallyUpdatedFilterPredicate> {
   private boolean built = false;
   private final Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn = new HashMap<ColumnPath, List<ValueInspector>>();
+  private final Map<ColumnPath, PrimitiveComparator<?>> comparatorsByColumn = new HashMap<>();
 
+  @Deprecated
   public IncrementallyUpdatedFilterPredicateBuilderBase() { }
 
+  public IncrementallyUpdatedFilterPredicateBuilderBase(List<PrimitiveColumnIO> leaves) {
+    for (PrimitiveColumnIO leaf : leaves) {
+      ColumnDescriptor descriptor = leaf.getColumnDescriptor();
+      ColumnPath path = ColumnPath.get(descriptor.getPath());
+      PrimitiveComparator<?> comparator = descriptor.getPrimitiveType().comparator();
+      comparatorsByColumn.put(path, comparator);
+    }
+  }
+
   public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) {
     checkArgument(!built, "This builder has already been used");
     IncrementallyUpdatedFilterPredicate incremental = pred.accept(this);
@@ -78,6 +92,11 @@ public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements
     return valueInspectorsByColumn;
   }
 
+  @SuppressWarnings("unchecked")
+  protected final <T> PrimitiveComparator<T> getComparator(ColumnPath path) {
+    return (PrimitiveComparator<T>) comparatorsByColumn.get(path);
+  }
+
   @Override
   public final IncrementallyUpdatedFilterPredicate visit(And and) {
     return new IncrementallyUpdatedFilterPredicate.And(and.getLeft().accept(this), and.getRight().accept(this));

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
index 67efdb3..7346c5a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
@@ -109,7 +109,7 @@ public class MessageColumnIO extends GroupColumnIO {
       public RecordReader<T> visit(FilterPredicateCompat filterPredicateCompat) {
 
         FilterPredicate predicate = filterPredicateCompat.getFilterPredicate();
-        IncrementallyUpdatedFilterPredicateBuilder builder = new IncrementallyUpdatedFilterPredicateBuilder();
+        IncrementallyUpdatedFilterPredicateBuilder builder = new IncrementallyUpdatedFilterPredicateBuilder(leaves);
         IncrementallyUpdatedFilterPredicate streamingPredicate = builder.build(predicate);
         RecordMaterializer<T> filteringRecordMaterializer = new FilteringRecordMaterializer<T>(
             recordMaterializer,

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
index 15c28c8..e40b24f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
@@ -52,10 +52,9 @@ public class PrimitiveColumnIO extends ColumnIO {
     super.setLevels(r, d, fieldPath, fieldIndexPath, repetition, path);
     PrimitiveType type = getType().asPrimitiveType();
     this.columnDescriptor = new ColumnDescriptor(
-        fieldPath, 
-        type.getPrimitiveTypeName(),
-        type.getTypeLength(),
-        getRepetitionLevel(), 
+        fieldPath,
+        type,
+        getRepetitionLevel(),
         getDefinitionLevel());
     this.path = path.toArray(new ColumnIO[path.size()]);
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
index 50b98c2..9f5f0f2 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
 
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.PrimitiveComparator;
 
 import static org.apache.parquet.bytes.BytesUtils.UTF8;
 
@@ -71,12 +72,14 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
   abstract boolean equals(Binary other);
 
+  /**
+   * @deprecated will be removed in 2.0.0. The comparison logic depends on the related logical type therefore this one
+   * might not be correct. The {@link java.util.Comparator} implementation for the related type available at
+   * {@link Type#comparator()} shall be used instead.
+   */
+  @Deprecated
   abstract public int compareTo(Binary other);
 
-  abstract int compareTo(byte[] bytes, int offset, int length);
-
-  abstract int compareTo(ByteBuffer bytes, int offset, int length);
-
   abstract public ByteBuffer toByteBuffer();
 
   @Override
@@ -189,17 +192,7 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
     @Override
     public int compareTo(Binary other) {
-      return other.compareTo(value, offset, length);
-    }
-
-    @Override
-    int compareTo(byte[] other, int otherOffset, int otherLength) {
-      return Binary.compareTwoByteArrays(value, offset, length, other, otherOffset, otherLength);
-    }
-
-    @Override
-    int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) {
-      return Binary.compareByteArrayToByteBuffer(value, offset, length, bytes, otherOffset, otherLength);
+      return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR.compare(this, other);
     }
 
     @Override
@@ -345,20 +338,10 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
     @Override
     public int compareTo(Binary other) {
-      return other.compareTo(value, 0, value.length);
+      return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR.compare(this, other);
     }
 
-    @Override
-    int compareTo(byte[] other, int otherOffset, int otherLength) {
-      return Binary.compareTwoByteArrays(value, 0, value.length, other, otherOffset, otherLength);
-    }
-
-    @Override
-    int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) {
-      return Binary.compareByteArrayToByteBuffer(value, 0, value.length, bytes, otherOffset, otherLength);
-    }
-
-    @Override
+   @Override
     public ByteBuffer toByteBuffer() {
       return ByteBuffer.wrap(value);
     }
@@ -505,31 +488,12 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
     @Override
     public int compareTo(Binary other) {
-      if (value.hasArray()) {
-        return other.compareTo(value.array(), value.arrayOffset() + offset, length);
-      } else {
-        return other.compareTo(value, offset, length);
-      }
-    }
-
-    @Override
-    int compareTo(byte[] other, int otherOffset, int otherLength) {
-      if (value.hasArray()) {
-        return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + offset, length,
-            other, otherOffset, otherLength);
-      } {
-        return Binary.compareByteBufferToByteArray(value, offset, length, other, otherOffset, otherLength);
-      }
-    }
-
-    @Override
-    int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) {
-      return Binary.compareTwoByteBuffers(value, offset, length, bytes, otherOffset, otherLength);
+      return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR.compare(this, other);
     }
 
     @Override
     public ByteBuffer toByteBuffer() {
-      ByteBuffer ret = value.slice();
+      ByteBuffer ret = value.duplicate();
       ret.position(offset);
       ret.limit(offset + length);
       return ret;
@@ -665,64 +629,4 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     }
     return true;
   }
-
-  private static final int compareByteBufferToByteArray(ByteBuffer buf, int offset1, int length1,
-                                                        byte[] array, int offset2, int length2) {
-    return -1 * Binary.compareByteArrayToByteBuffer(array, offset1, length1, buf, offset2, length2);
-  }
-
-  private static final int compareByteArrayToByteBuffer(byte[] array1, int offset1, int length1,
-                                                        ByteBuffer buf, int offset2, int length2) {
-    if (array1 == null && buf == null) return 0;
-    int min_length = (length1 < length2) ? length1 : length2;
-    for (int i = 0; i < min_length; i++) {
-      if (array1[i + offset1] < buf.get(i + offset2)) {
-        return 1;
-      }
-      if (array1[i + offset1] > buf.get(i + offset2)) {
-        return -1;
-      }
-    }
-    // check remainder
-    if (length1 == length2) { return 0; }
-    else if (length1 < length2) { return 1;}
-    else { return -1; }
-  }
-
-  private static final int compareTwoByteBuffers(ByteBuffer buf1, int offset1, int length1,
-                                                        ByteBuffer buf2, int offset2, int length2) {
-    if (buf1 == null && buf2 == null) return 0;
-    int min_length = (length1 < length2) ? length1 : length2;
-    for (int i = 0; i < min_length; i++) {
-      if (buf1.get(i + offset1) < buf2.get(i + offset2)) {
-        return 1;
-      }
-      if (buf1.get(i + offset1) > buf2.get(i + offset2)) {
-        return -1;
-      }
-    }
-    // check remainder
-    if (length1 == length2) { return 0; }
-    else if (length1 < length2) { return 1;}
-    else { return -1; }
-  }
-
-  private static final int compareTwoByteArrays(byte[] array1, int offset1, int length1,
-                                                byte[] array2, int offset2, int length2) {
-    if (array1 == null && array2 == null) return 0;
-    if (array1 == array2 && offset1 == offset2 && length1 == length2) return 0;
-    int min_length = (length1 < length2) ? length1 : length2;
-    for (int i = 0; i < min_length; i++) {
-      if (array1[i + offset1] < array2[i + offset2]) {
-        return 1;
-      }
-      if (array1[i + offset1] > array2[i + offset2]) {
-        return -1;
-      }
-    }
-    // check remainder
-    if (length1 == length2) { return 0; }
-    else if (length1 < length2) { return 1;}
-    else { return -1; }
-  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java b/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java
new file mode 100644
index 0000000..144a93a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import org.apache.parquet.Preconditions;
+
+/**
+ * Class representing the column order with all the related parameters.
+ */
+public class ColumnOrder {
+  /**
+   * The enum type of the column order.
+   */
+  public enum ColumnOrderName {
+    /**
+     * Representing the case when the defined column order is undefined (e.g. the file is written by a later API and the
+     * current one does not support the related column order). No statistics will be written/read in this case.
+     */
+    UNDEFINED,
+    /**
+     * Type defined order meaning that the comparison order of the elements are based on its type.
+     */
+    TYPE_DEFINED_ORDER
+  }
+
+  private static final ColumnOrder UNDEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.UNDEFINED);
+  private static final ColumnOrder TYPE_DEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.TYPE_DEFINED_ORDER);
+
+  /**
+   * @return a {@link ColumnOrder} instance representing an undefined order
+   * @see ColumnOrderName#UNDEFINED
+   */
+  public static ColumnOrder undefined() {
+    return UNDEFINED_COLUMN_ORDER;
+  }
+
+  /**
+   * @return a {@link ColumnOrder} instance representing a type defined order
+   * @see ColumnOrderName#TYPE_DEFINED_ORDER
+   */
+  public static ColumnOrder typeDefined() {
+    return TYPE_DEFINED_COLUMN_ORDER;
+  }
+
+  private final ColumnOrderName columnOrderName;
+
+  private ColumnOrder(ColumnOrderName columnOrderName) {
+    this.columnOrderName = Preconditions.checkNotNull(columnOrderName, "columnOrderName");
+  }
+
+  public ColumnOrderName getColumnOrderName() {
+    return columnOrderName;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ColumnOrder) {
+      return columnOrderName == ((ColumnOrder) obj).columnOrderName;
+    }
+    return false;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int hashCode() {
+    return columnOrderName.hashCode();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    return columnOrderName.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
index 1e26ed2..afbc416 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
@@ -95,8 +95,7 @@ public final class MessageType extends GroupType {
     int maxRep = getMaxRepetitionLevel(path);
     int maxDef = getMaxDefinitionLevel(path);
     PrimitiveType type = getType(path).asPrimitiveType();
-    return new ColumnDescriptor(path, type.getPrimitiveTypeName(),
-                                type.getTypeLength(), maxRep, maxDef);
+    return new ColumnDescriptor(path, type, maxRep, maxDef);
   }
 
   public List<String[]> getPaths() {
@@ -111,8 +110,7 @@ public final class MessageType extends GroupType {
       PrimitiveType primitiveType = getType(path).asPrimitiveType();
       columns.add(new ColumnDescriptor(
                       path,
-                      primitiveType.getPrimitiveTypeName(),
-                      primitiveType.getTypeLength(),
+                      primitiveType,
                       getMaxRepetitionLevel(path),
                       getMaxDefinitionLevel(path)));
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java
new file mode 100644
index 0000000..085a67a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import org.apache.parquet.io.api.Binary;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+/**
+ * {@link Comparator} implementation that also supports the comparison of the related primitive type to avoid the
+ * performance penalty of boxing/unboxing. The {@code compare} methods for the not supported primitive types throw
+ * {@link UnsupportedOperationException}.
+ */
+public abstract class PrimitiveComparator<T> implements Comparator<T> {
+
+  public int compare(boolean b1, boolean b2) {
+    throw new UnsupportedOperationException(
+        "compare(boolean, boolean) was called on a non-boolean comparator: " + toString());
+  }
+
+  public int compare(int i1, int i2) {
+    throw new UnsupportedOperationException("compare(int, int) was called on a non-int comparator: " + toString());
+  }
+
+  public int compare(long l1, long l2) {
+    throw new UnsupportedOperationException("compare(long, long) was called on a non-long comparator: " + toString());
+  }
+
+  public int compare(float f1, float f2) {
+    throw new UnsupportedOperationException(
+        "compare(float, float) was called on a non-float comparator: " + toString());
+  }
+
+  public int compare(double d1, double d2) {
+    throw new UnsupportedOperationException(
+        "compare(double, double) was called on a non-double comparator: " + toString());
+  }
+
+  @Override
+  public final int compare(T o1, T o2) {
+    if (o1 == null) {
+      return o2 == null ? 0 : -1;
+    }
+    return o2 == null ? 1 : compareNotNulls(o1, o2);
+  }
+
+  abstract int compareNotNulls(T o1, T o2);
+
+  static final PrimitiveComparator<Boolean> BOOLEAN_COMPARATOR = new PrimitiveComparator<Boolean>() {
+    @Override
+    int compareNotNulls(Boolean o1, Boolean o2) {
+      return compare(o1.booleanValue(), o2.booleanValue());
+    }
+
+    @Override
+    public int compare(boolean b1, boolean b2) {
+      return Boolean.compare(b1, b2);
+    }
+
+    @Override
+    public String toString() {
+      return "BOOLEAN_COMPARATOR";
+    }
+  };
+
+  private static abstract class IntComparator extends PrimitiveComparator<Integer> {
+    @Override
+    int compareNotNulls(Integer o1, Integer o2) {
+      return compare(o1.intValue(), o2.intValue());
+    }
+  }
+
+  static final PrimitiveComparator<Integer> SIGNED_INT32_COMPARATOR = new IntComparator() {
+    @Override
+    public int compare(int i1, int i2) {
+      return Integer.compare(i1, i2);
+    }
+
+    @Override
+    public String toString() {
+      return "SIGNED_INT32_COMPARATOR";
+    }
+  };
+
+  static final PrimitiveComparator<Integer> UNSIGNED_INT32_COMPARATOR = new IntComparator() {
+    @Override
+    public int compare(int i1, int i2) {
+      // Implemented based on com.google.common.primitives.UnsignedInts.compare(int, int)
+      return Integer.compare(i1 ^ Integer.MIN_VALUE, i2 ^ Integer.MIN_VALUE);
+    }
+
+    @Override
+    public String toString() {
+      return "UNSIGNED_INT32_COMPARATOR";
+    }
+  };
+
+  private static abstract class LongComparator extends PrimitiveComparator<Long> {
+    @Override
+    int compareNotNulls(Long o1, Long o2) {
+      return compare(o1.longValue(), o2.longValue());
+    }
+  }
+
+  static final PrimitiveComparator<Long> SIGNED_INT64_COMPARATOR = new LongComparator() {
+    @Override
+    public int compare(long l1, long l2) {
+      return Long.compare(l1, l2);
+    }
+
+    @Override
+    public String toString() {
+      return "SIGNED_INT64_COMPARATOR";
+    }
+  };
+
+  static final PrimitiveComparator<Long> UNSIGNED_INT64_COMPARATOR = new LongComparator() {
+    @Override
+    public int compare(long l1, long l2) {
+      // Implemented based on com.google.common.primitives.UnsignedLongs.compare(long, long)
+      return Long.compare(l1 ^ Long.MIN_VALUE, l2 ^ Long.MIN_VALUE);
+    }
+
+    @Override
+    public String toString() {
+      return "UNSIGNED_INT64_COMPARATOR";
+    }
+  };
+
+  static final PrimitiveComparator<Float> FLOAT_COMPARATOR = new PrimitiveComparator<Float>() {
+    @Override
+    int compareNotNulls(Float o1, Float o2) {
+      return compare(o1.floatValue(), o2.floatValue());
+    }
+
+    @Override
+    public int compare(float f1, float f2) {
+      return Float.compare(f1, f2);
+    }
+
+    @Override
+    public String toString() {
+      return "FLOAT_COMPARATOR";
+    }
+  };
+
+  static final PrimitiveComparator<Double> DOUBLE_COMPARATOR = new PrimitiveComparator<Double>() {
+    @Override
+    int compareNotNulls(Double o1, Double o2) {
+      return compare(o1.doubleValue(), o2.doubleValue());
+    }
+
+    @Override
+    public int compare(double d1, double d2) {
+      return Double.compare(d1, d2);
+    }
+
+    @Override
+    public String toString() {
+      return "DOUBLE_COMPARATOR";
+    }
+  };
+
+  private static abstract class BinaryComparator extends PrimitiveComparator<Binary> {
+    @Override
+    int compareNotNulls(Binary o1, Binary o2) {
+      return compare(o1.toByteBuffer(), o2.toByteBuffer());
+    }
+
+    abstract int compare(ByteBuffer b1, ByteBuffer b2);
+
+    final int toUnsigned(byte b) {
+      return b & 0xFF;
+    }
+  }
+
+  public static final PrimitiveComparator<Binary> UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR = new BinaryComparator() {
+    @Override
+    int compare(ByteBuffer b1, ByteBuffer b2) {
+      int l1 = b1.remaining();
+      int l2 = b2.remaining();
+      int p1 = b1.position();
+      int p2 = b2.position();
+      int minL = Math.min(l1, l2);
+
+      for (int i = 0; i < minL; ++i) {
+        int result = unsignedCompare(b1.get(p1 + i), b2.get(p2 + i));
+        if (result != 0) {
+          return result;
+        }
+      }
+
+      return l1 - l2;
+    }
+
+    private int unsignedCompare(byte b1, byte b2) {
+      return toUnsigned(b1) - toUnsigned(b2);
+    }
+
+    @Override
+    public String toString() {
+      return "UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR";
+    }
+  };
+
+  /*
+   * This comparator is for comparing two signed decimal values represented in twos-complement binary. In case of the
+   * binary length of one value is shorter than the other it will be padded by the corresponding prefix (0xFF for
+   * negative, 0x00 for positive values).
+   */
+  static final PrimitiveComparator<Binary> BINARY_AS_SIGNED_INTEGER_COMPARATOR = new BinaryComparator() {
+    private static final int NEGATIVE_PADDING = 0xFF;
+    private static final int POSITIVE_PADDING = 0;
+
+    @Override
+    int compare(ByteBuffer b1, ByteBuffer b2) {
+      int l1 = b1.remaining();
+      int l2 = b2.remaining();
+      int p1 = b1.position();
+      int p2 = b2.position();
+
+      boolean isNegative1 = l1 > 0 ? b1.get(p1) < 0 : false;
+      boolean isNegative2 = l2 > 0 ? b2.get(p2) < 0 : false;
+      if (isNegative1 != isNegative2) {
+        return isNegative1 ? -1 : 1;
+      }
+
+      int result = 0;
+
+      // Compare the beginning of the longer buffer with the proper padding
+      if (l1 < l2) {
+        int lengthDiff = l2 - l1;
+        result = -compareWithPadding(lengthDiff, b2, p2, isNegative1 ? NEGATIVE_PADDING : POSITIVE_PADDING);
+        p2 += lengthDiff;
+      } else if (l1 > l2) {
+        int lengthDiff = l1 - l2;
+        result = compareWithPadding(lengthDiff, b1, p1, isNegative2 ? NEGATIVE_PADDING : POSITIVE_PADDING);
+        p1 += lengthDiff;
+      }
+
+      // The beginning of the longer buffer equals to the padding or the lengths are equal
+      if (result == 0) {
+        result = compare(l1, b1, p1, b2, p2);
+      }
+      return result;
+    }
+
+    private int compareWithPadding(int length, ByteBuffer b, int p, int paddingByte) {
+      for (int i = p, n = p + length; i < n; ++i) {
+        int result = toUnsigned(b.get(i)) - paddingByte;
+        if (result != 0) {
+          return result;
+        }
+      }
+      return 0;
+    }
+
+    private int compare(int length, ByteBuffer b1, int p1, ByteBuffer b2, int p2) {
+      for (int i = 0; i < length; ++i) {
+        int result = toUnsigned(b1.get(p1 + i)) - toUnsigned(b2.get(p2 + i));
+        if (result != 0) {
+          return result;
+        }
+      }
+      return 0;
+    }
+
+    @Override
+    public String toString() {
+      return "BINARY_AS_SIGNED_INTEGER_COMPARATOR";
+    }
+  };
+}