You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/17 10:58:01 UTC

[jira] [Commented] (PARQUET-1211) Column indexes: read/write API

    [ https://issues.apache.org/jira/browse/PARQUET-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16478892#comment-16478892 ] 

ASF GitHub Bot commented on PARQUET-1211:
-----------------------------------------

zivanfi closed pull request #456: PARQUET-1211: Column indexes: read/write API
URL: https://github.com/apache/parquet-mr/pull/456
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 39b65da9f..f965511da 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -163,7 +163,7 @@ public ColumnWriteStore newColumnWriteStore(MessageType schema,
                                               PageWriteStore pageStore) {
     switch (writerVersion) {
     case PARQUET_1_0:
-      return new ColumnWriteStoreV1(pageStore, this);
+      return new ColumnWriteStoreV1(schema, pageStore, this);
     case PARQUET_2_0:
       return new ColumnWriteStoreV2(schema, pageStore, this);
     default:
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
new file mode 100644
index 000000000..d04192fbf
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.util.Collections.unmodifiableMap;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.PageWriteStore;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Base implementation for {@link ColumnWriteStore} to be extended to specialize for V1 and V2 pages.
+ */
+abstract class ColumnWriteStoreBase implements ColumnWriteStore {
+
+  // Used to support the deprecated workflow of ColumnWriteStoreV1 (lazy init of ColumnWriters)
+  private interface ColumnWriterProvider {
+    ColumnWriter getColumnWriter(ColumnDescriptor path);
+  }
+
+  private final ColumnWriterProvider columnWriterProvider;
+
+  // will flush even if size bellow the threshold by this much to facilitate page alignment
+  private static final float THRESHOLD_TOLERANCE_RATIO = 0.1f; // 10 %
+
+  private final Map<ColumnDescriptor, ColumnWriterBase> columns;
+  private final ParquetProperties props;
+  private final long thresholdTolerance;
+  private long rowCount;
+  private long rowCountForNextSizeCheck;
+
+  // To be used by the deprecated constructor of ColumnWriteStoreV1
+  @Deprecated
+  ColumnWriteStoreBase(
+      final PageWriteStore pageWriteStore,
+      final ParquetProperties props) {
+    this.props = props;
+    this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
+
+    this.columns = new TreeMap<>();
+
+    this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+
+    columnWriterProvider = new ColumnWriterProvider() {
+      @Override
+      public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+        ColumnWriterBase column = columns.get(path);
+        if (column == null) {
+          column = createColumnWriter(path, pageWriteStore.getPageWriter(path), props);
+          columns.put(path, column);
+        }
+        return column;
+      }
+    };
+  }
+
+  ColumnWriteStoreBase(
+      MessageType schema,
+      PageWriteStore pageWriteStore,
+      ParquetProperties props) {
+    this.props = props;
+    this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
+    Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
+    for (ColumnDescriptor path : schema.getColumns()) {
+      PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+      mcolumns.put(path, createColumnWriter(path, pageWriter, props));
+    }
+    this.columns = unmodifiableMap(mcolumns);
+
+    this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+
+    columnWriterProvider = new ColumnWriterProvider() {
+      @Override
+      public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+        return columns.get(path);
+      }
+    };
+  }
+
+  abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props);
+
+  public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+    return columnWriterProvider.getColumnWriter(path);
+  }
+
+  public Set<ColumnDescriptor> getColumnDescriptors() {
+    return columns.keySet();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (Entry<ColumnDescriptor, ColumnWriterBase> entry : columns.entrySet()) {
+      sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
+      sb.append(entry.getValue().getTotalBufferedSize()).append(" bytes");
+      sb.append("\n");
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    long total = 0;
+    for (ColumnWriterBase memColumn : columns.values()) {
+      total += memColumn.allocatedSize();
+    }
+    return total;
+  }
+
+  @Override
+  public long getBufferedSize() {
+    long total = 0;
+    for (ColumnWriterBase memColumn : columns.values()) {
+      total += memColumn.getTotalBufferedSize();
+    }
+    return total;
+  }
+
+  @Override
+  public void flush() {
+    for (ColumnWriterBase memColumn : columns.values()) {
+      long rows = rowCount - memColumn.getRowsWrittenSoFar();
+      if (rows > 0) {
+        memColumn.writePage(rowCount);
+      }
+      memColumn.finalizeColumnChunk();
+    }
+  }
+
+  public String memUsageString() {
+    StringBuilder b = new StringBuilder("Store {\n");
+    for (ColumnWriterBase memColumn : columns.values()) {
+      b.append(memColumn.memUsageString(" "));
+    }
+    b.append("}\n");
+    return b.toString();
+  }
+
+  public long maxColMemSize() {
+    long max = 0;
+    for (ColumnWriterBase memColumn : columns.values()) {
+      max = Math.max(max, memColumn.getBufferedSizeInMemory());
+    }
+    return max;
+  }
+
+  @Override
+  public void close() {
+    flush(); // calling flush() here to keep it consistent with the behavior before merging with master
+    for (ColumnWriterBase memColumn : columns.values()) {
+      memColumn.close();
+    }
+  }
+
+  @Override
+  public void endRecord() {
+    ++rowCount;
+    if (rowCount >= rowCountForNextSizeCheck) {
+      sizeCheck();
+    }
+  }
+
+  private void sizeCheck() {
+    long minRecordToWait = Long.MAX_VALUE;
+    for (ColumnWriterBase writer : columns.values()) {
+      long usedMem = writer.getCurrentPageBufferedSize();
+      long rows = rowCount - writer.getRowsWrittenSoFar();
+      long remainingMem = props.getPageSizeThreshold() - usedMem;
+      if (remainingMem <= thresholdTolerance) {
+        writer.writePage(rowCount);
+        remainingMem = props.getPageSizeThreshold();
+      }
+      long rowsToFillPage =
+          usedMem == 0 ?
+              props.getMaxRowCountForPageSizeCheck()
+              : (long) ((float) rows) / usedMem * remainingMem;
+      if (rowsToFillPage < minRecordToWait) {
+        minRecordToWait = rowsToFillPage;
+      }
+    }
+    if (minRecordToWait == Long.MAX_VALUE) {
+      minRecordToWait = props.getMinRowCountForPageSizeCheck();
+    }
+
+    if (props.estimateNextSizeCheck()) {
+      // will check again halfway if between min and max
+      rowCountForNextSizeCheck = rowCount +
+          min(
+              max(minRecordToWait / 2, props.getMinRowCountForPageSizeCheck()),
+              props.getMaxRowCountForPageSizeCheck());
+    } else {
+      rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck();
+    }
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
index 93a497fad..7258423fb 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
@@ -18,121 +18,26 @@
  */
 package org.apache.parquet.column.impl;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnWriteStore;
-import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.schema.MessageType;
 
-public class ColumnWriteStoreV1 implements ColumnWriteStore {
-
-  private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();
-  private final PageWriteStore pageWriteStore;
-  private final ParquetProperties props;
-
-  public ColumnWriteStoreV1(PageWriteStore pageWriteStore,
-                            ParquetProperties props) {
-    this.pageWriteStore = pageWriteStore;
-    this.props = props;
-  }
-
-  public ColumnWriter getColumnWriter(ColumnDescriptor path) {
-    ColumnWriterV1 column = columns.get(path);
-    if (column == null) {
-      column = newMemColumn(path);
-      columns.put(path, column);
-    }
-    return column;
-  }
-
-  public Set<ColumnDescriptor> getColumnDescriptors() {
-    return columns.keySet();
-  }
-
-  private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
-    PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-    return new ColumnWriterV1(path, pageWriter, props);
-  }
-
-  @Override
-  public String toString() {
-      StringBuilder sb = new StringBuilder();
-      for (Entry<ColumnDescriptor, ColumnWriterV1> entry : columns.entrySet()) {
-        sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
-        sb.append(entry.getValue().getBufferedSizeInMemory()).append(" bytes");
-        sb.append("\n");
-      }
-      return sb.toString();
-  }
-
-  @Override
-  public long getAllocatedSize() {
-    Collection<ColumnWriterV1> values = columns.values();
-    long total = 0;
-    for (ColumnWriterV1 memColumn : values) {
-      total += memColumn.allocatedSize();
-    }
-    return total;
-  }
-
-  @Override
-  public long getBufferedSize() {
-    Collection<ColumnWriterV1> values = columns.values();
-    long total = 0;
-    for (ColumnWriterV1 memColumn : values) {
-      total += memColumn.getBufferedSizeInMemory();
-    }
-    return total;
-  }
-
-  @Override
-  public String memUsageString() {
-    StringBuilder b = new StringBuilder("Store {\n");
-    Collection<ColumnWriterV1> values = columns.values();
-    for (ColumnWriterV1 memColumn : values) {
-      b.append(memColumn.memUsageString(" "));
-    }
-    b.append("}\n");
-    return b.toString();
-  }
+public class ColumnWriteStoreV1 extends ColumnWriteStoreBase {
 
-  public long maxColMemSize() {
-    Collection<ColumnWriterV1> values = columns.values();
-    long max = 0;
-    for (ColumnWriterV1 memColumn : values) {
-      max = Math.max(max, memColumn.getBufferedSizeInMemory());
-    }
-    return max;
+  public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore, ParquetProperties props) {
+    super(schema, pageWriteStore, props);
   }
 
-  @Override
-  public void flush() {
-    Collection<ColumnWriterV1> values = columns.values();
-    for (ColumnWriterV1 memColumn : values) {
-      memColumn.flush();
-    }
+  @Deprecated
+  public ColumnWriteStoreV1(final PageWriteStore pageWriteStore,
+      final ParquetProperties props) {
+    super(pageWriteStore, props);
   }
 
   @Override
-  public void endRecord() {
-    // V1 does not take record boundaries into account
-  }
-
-  public void close() {
-    Collection<ColumnWriterV1> values = columns.values();
-    for (ColumnWriterV1 memColumn : values) {
-      memColumn.close();
-    }
+  ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
+    return new ColumnWriterV1(path, pageWriter, props);
   }
-
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
index 7574cedf7..bf1090d0b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
@@ -18,158 +18,20 @@
  */
 package org.apache.parquet.column.impl;
 
-import static java.lang.Math.max;
-import static java.lang.Math.min;
-import static java.util.Collections.unmodifiableMap;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnWriteStore;
-import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.schema.MessageType;
 
-public class ColumnWriteStoreV2 implements ColumnWriteStore {
-
-  // will flush even if size bellow the threshold by this much to facilitate page alignment
-  private static final float THRESHOLD_TOLERANCE_RATIO = 0.1f; // 10 %
-
-  private final Map<ColumnDescriptor, ColumnWriterV2> columns;
-  private final Collection<ColumnWriterV2> writers;
-  private final ParquetProperties props;
-  private final long thresholdTolerance;
-  private long rowCount;
-  private long rowCountForNextSizeCheck;
-
-  public ColumnWriteStoreV2(
-      MessageType schema,
-      PageWriteStore pageWriteStore,
-      ParquetProperties props) {
-    this.props = props;
-    this.thresholdTolerance = (long)(props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
-    Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
-    for (ColumnDescriptor path : schema.getColumns()) {
-      PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-      mcolumns.put(path, new ColumnWriterV2(path, pageWriter, props));
-    }
-    this.columns = unmodifiableMap(mcolumns);
-    this.writers = this.columns.values();
-
-    this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
-  }
-
-  public ColumnWriter getColumnWriter(ColumnDescriptor path) {
-    return columns.get(path);
-  }
-
-  public Set<ColumnDescriptor> getColumnDescriptors() {
-    return columns.keySet();
-  }
-
-  @Override
-  public String toString() {
-      StringBuilder sb = new StringBuilder();
-      for (Entry<ColumnDescriptor, ColumnWriterV2> entry : columns.entrySet()) {
-        sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
-        sb.append(entry.getValue().getTotalBufferedSize()).append(" bytes");
-        sb.append("\n");
-      }
-      return sb.toString();
-  }
-
-  @Override
-  public long getAllocatedSize() {
-    long total = 0;
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      total += memColumn.allocatedSize();
-    }
-    return total;
-  }
-
-  @Override
-  public long getBufferedSize() {
-    long total = 0;
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      total += memColumn.getTotalBufferedSize();
-    }
-    return total;
-  }
-
-  @Override
-  public void flush() {
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      long rows = rowCount - memColumn.getRowsWrittenSoFar();
-      if (rows > 0) {
-        memColumn.writePage(rowCount);
-      }
-      memColumn.finalizeColumnChunk();
-    }
-  }
+public class ColumnWriteStoreV2 extends ColumnWriteStoreBase {
 
-  public String memUsageString() {
-    StringBuilder b = new StringBuilder("Store {\n");
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      b.append(memColumn.memUsageString(" "));
-    }
-    b.append("}\n");
-    return b.toString();
+  public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, ParquetProperties props) {
+    super(schema, pageWriteStore, props);
   }
 
   @Override
-  public void close() {
-    flush(); // calling flush() here to keep it consistent with the behavior before merging with master
-    for (ColumnWriterV2 memColumn : columns.values()) {
-      memColumn.close();
-    }
+  ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
+    return new ColumnWriterV2(path, pageWriter, props);
   }
-
-  @Override
-  public void endRecord() {
-    ++ rowCount;
-    if (rowCount >= rowCountForNextSizeCheck) {
-      sizeCheck();
-    }
-  }
-
-  private void sizeCheck() {
-    long minRecordToWait = Long.MAX_VALUE;
-    for (ColumnWriterV2 writer : writers) {
-      long usedMem = writer.getCurrentPageBufferedSize();
-      long rows = rowCount - writer.getRowsWrittenSoFar();
-      long remainingMem = props.getPageSizeThreshold() - usedMem;
-      if (remainingMem <= thresholdTolerance) {
-        writer.writePage(rowCount);
-        remainingMem = props.getPageSizeThreshold();
-      }
-      long rowsToFillPage =
-          usedMem == 0 ?
-              props.getMaxRowCountForPageSizeCheck()
-              : (long)((float)rows) / usedMem * remainingMem;
-      if (rowsToFillPage < minRecordToWait) {
-        minRecordToWait = rowsToFillPage;
-      }
-    }
-    if (minRecordToWait == Long.MAX_VALUE) {
-      minRecordToWait = props.getMinRowCountForPageSizeCheck();
-    }
-
-    if(props.estimateNextSizeCheck()) {
-      // will check again halfway if between min and max
-      rowCountForNextSizeCheck = rowCount +
-          min(
-              max(minRecordToWait / 2, props.getMinRowCountForPageSizeCheck()),
-              props.getMaxRowCountForPageSizeCheck());
-    } else {
-      rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck();
-    }
-  }
-
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
new file mode 100644
index 000000000..16085bb80
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import java.io.IOException;
+
+import org.apache.parquet.Ints;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base implementation for {@link ColumnWriter} to be extended to specialize for V1 and V2 pages.
+ */
+abstract class ColumnWriterBase implements ColumnWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnWriterBase.class);
+
+  // By default: Debugging disabled this way (using the "if (DEBUG)" IN the methods) to allow
+  // the java compiler (not the JIT) to remove the unused statements during build time.
+  private static final boolean DEBUG = false;
+
+  final ColumnDescriptor path;
+  final PageWriter pageWriter;
+  private ValuesWriter repetitionLevelColumn;
+  private ValuesWriter definitionLevelColumn;
+  private ValuesWriter dataColumn;
+  private int valueCount;
+
+  private Statistics<?> statistics;
+  private long rowsWrittenSoFar = 0;
+
+  ColumnWriterBase(
+      ColumnDescriptor path,
+      PageWriter pageWriter,
+      ParquetProperties props) {
+    this.path = path;
+    this.pageWriter = pageWriter;
+    resetStatistics();
+
+    this.repetitionLevelColumn = createRLWriter(props, path);
+    this.definitionLevelColumn = createDLWriter(props, path);
+    this.dataColumn = props.newValuesWriter(path);
+  }
+
+  abstract ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path);
+
+  abstract ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path);
+
+  private void log(Object value, int r, int d) {
+    LOG.debug("{} {} r:{} d:{}", path, value, r, d);
+  }
+
+  private void resetStatistics() {
+    this.statistics = Statistics.createStats(path.getPrimitiveType());
+  }
+
+  private void definitionLevel(int definitionLevel) {
+    definitionLevelColumn.writeInteger(definitionLevel);
+  }
+
+  private void repetitionLevel(int repetitionLevel) {
+    repetitionLevelColumn.writeInteger(repetitionLevel);
+  }
+
+  /**
+   * Writes the current null value
+   *
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  @Override
+  public void writeNull(int repetitionLevel, int definitionLevel) {
+    if (DEBUG)
+      log(null, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    statistics.incrementNumNulls();
+    ++valueCount;
+  }
+
+  @Override
+  public void close() {
+    // Close the Values writers.
+    repetitionLevelColumn.close();
+    definitionLevelColumn.close();
+    dataColumn.close();
+  }
+
+  @Override
+  public long getBufferedSizeInMemory() {
+    return repetitionLevelColumn.getBufferedSize()
+        + definitionLevelColumn.getBufferedSize()
+        + dataColumn.getBufferedSize()
+        + pageWriter.getMemSize();
+  }
+
+  /**
+   * Writes the current value
+   *
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  @Override
+  public void write(double value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG)
+      log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeDouble(value);
+    statistics.updateStats(value);
+    ++valueCount;
+  }
+
+  /**
+   * Writes the current value
+   *
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  @Override
+  public void write(float value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG)
+      log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeFloat(value);
+    statistics.updateStats(value);
+    ++valueCount;
+  }
+
+  /**
+   * Writes the current value
+   *
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  @Override
+  public void write(Binary value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG)
+      log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeBytes(value);
+    statistics.updateStats(value);
+    ++valueCount;
+  }
+
+  /**
+   * Writes the current value
+   *
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  @Override
+  public void write(boolean value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG)
+      log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeBoolean(value);
+    statistics.updateStats(value);
+    ++valueCount;
+  }
+
+  /**
+   * Writes the current value
+   *
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  @Override
+  public void write(int value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG)
+      log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeInteger(value);
+    statistics.updateStats(value);
+    ++valueCount;
+  }
+
+  /**
+   * Writes the current value
+   *
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  @Override
+  public void write(long value, int repetitionLevel, int definitionLevel) {
+    if (DEBUG)
+      log(value, repetitionLevel, definitionLevel);
+    repetitionLevel(repetitionLevel);
+    definitionLevel(definitionLevel);
+    dataColumn.writeLong(value);
+    statistics.updateStats(value);
+    ++valueCount;
+  }
+
+  /**
+   * Finalizes the Column chunk. Possibly adding extra pages if needed (dictionary, ...)
+   * Is called right after writePage
+   */
+  void finalizeColumnChunk() {
+    final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
+    if (dictionaryPage != null) {
+      if (DEBUG)
+        LOG.debug("write dictionary");
+      try {
+        pageWriter.writeDictionaryPage(dictionaryPage);
+      } catch (IOException e) {
+        throw new ParquetEncodingException("could not write dictionary page for " + path, e);
+      }
+      dataColumn.resetDictionary();
+    }
+  }
+
+  /**
+   * Used to decide when to write a page
+   *
+   * @return the number of bytes of memory used to buffer the current data
+   */
+  long getCurrentPageBufferedSize() {
+    return repetitionLevelColumn.getBufferedSize()
+        + definitionLevelColumn.getBufferedSize()
+        + dataColumn.getBufferedSize();
+  }
+
+  /**
+   * Used to decide when to write a page or row group
+   *
+   * @return the number of bytes of memory used to buffer the current data and the previously written pages
+   */
+  long getTotalBufferedSize() {
+    return repetitionLevelColumn.getBufferedSize()
+        + definitionLevelColumn.getBufferedSize()
+        + dataColumn.getBufferedSize()
+        + pageWriter.getMemSize();
+  }
+
+  /**
+   * @return actual memory used
+   */
+  long allocatedSize() {
+    return repetitionLevelColumn.getAllocatedSize()
+        + definitionLevelColumn.getAllocatedSize()
+        + dataColumn.getAllocatedSize()
+        + pageWriter.allocatedSize();
+  }
+
+  /**
+   * @param indent
+   *          a prefix to format lines
+   * @return a formatted string showing how memory is used
+   */
+  String memUsageString(String indent) {
+    StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
+    b.append(indent).append(" r:").append(repetitionLevelColumn.getAllocatedSize()).append(" bytes\n");
+    b.append(indent).append(" d:").append(definitionLevelColumn.getAllocatedSize()).append(" bytes\n");
+    b.append(dataColumn.memUsageString(indent + "  data:")).append("\n");
+    b.append(pageWriter.memUsageString(indent + "  pages:")).append("\n");
+    b.append(indent).append(String.format("  total: %,d/%,d", getTotalBufferedSize(), allocatedSize())).append("\n");
+    b.append(indent).append("}\n");
+    return b.toString();
+  }
+
+  long getRowsWrittenSoFar() {
+    return this.rowsWrittenSoFar;
+  }
+
+  /**
+   * Writes the current data to a new page in the page store
+   *
+   * @param rowCount
+   *          how many rows have been written so far
+   */
+  void writePage(long rowCount) {
+    int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
+    this.rowsWrittenSoFar = rowCount;
+    if (DEBUG)
+      LOG.debug("write page");
+    try {
+      writePage(pageRowCount, valueCount, statistics, repetitionLevelColumn, definitionLevelColumn, dataColumn);
+    } catch (IOException e) {
+      throw new ParquetEncodingException("could not write page for " + path, e);
+    }
+    repetitionLevelColumn.reset();
+    definitionLevelColumn.reset();
+    dataColumn.reset();
+    valueCount = 0;
+    resetStatistics();
+  }
+
+  abstract void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
+      ValuesWriter definitionLevels, ValuesWriter values) throws IOException;
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
index c1f5d67b0..646e31aa7 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
@@ -23,261 +23,40 @@
 import java.io.IOException;
 
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesWriter;
-import org.apache.parquet.io.ParquetEncodingException;
-import org.apache.parquet.io.api.Binary;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
  */
-final class ColumnWriterV1 implements ColumnWriter {
-  private static final Logger LOG = LoggerFactory.getLogger(ColumnWriterV1.class);
+final class ColumnWriterV1 extends ColumnWriterBase {
 
-  // By default: Debugging disabled this way (using the "if (DEBUG)" IN the methods) to allow
-  // the java compiler (not the JIT) to remove the unused statements during build time.
-  private static final boolean DEBUG = false;
-
-  private final ColumnDescriptor path;
-  private final PageWriter pageWriter;
-  private final ParquetProperties props;
-
-  private ValuesWriter repetitionLevelColumn;
-  private ValuesWriter definitionLevelColumn;
-  private ValuesWriter dataColumn;
-  private int valueCount;
-  private int valueCountForNextSizeCheck;
-
-  private Statistics statistics;
-
-  public ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter,
-                        ParquetProperties props) {
-    this.path = path;
-    this.pageWriter = pageWriter;
-    this.props = props;
-
-    // initial check of memory usage. So that we have enough data to make an initial prediction
-    this.valueCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
-
-    resetStatistics();
-
-    this.repetitionLevelColumn = props.newRepetitionLevelWriter(path);
-    this.definitionLevelColumn = props.newDefinitionLevelWriter(path);
-    this.dataColumn = props.newValuesWriter(path);
-  }
-
-  private void log(Object value, int r, int d) {
-    if (DEBUG) LOG.debug( "{} {} r:{} d:{}", path, value, r, d);
-  }
-
-  private void resetStatistics() {
-    this.statistics = Statistics.createStats(this.path.getPrimitiveType());
-  }
-
-  /**
-   * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
-   *
-   * We measure the memory used when we reach the mid point toward our estimated count.
-   * We then update the estimate and flush the page if we reached the threshold.
-   *
-   * That way we check the memory size log2(n) times.
-   *
-   */
-  private void accountForValueWritten() {
-    ++ valueCount;
-    if (valueCount > valueCountForNextSizeCheck) {
-      // not checking the memory used for every value
-      long memSize = repetitionLevelColumn.getBufferedSize()
-          + definitionLevelColumn.getBufferedSize()
-          + dataColumn.getBufferedSize();
-      if (memSize > props.getPageSizeThreshold()) {
-        // we will write the current page and check again the size at the predicted middle of next page
-        if (props.estimateNextSizeCheck()) {
-          valueCountForNextSizeCheck = valueCount / 2;
-        } else {
-          valueCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
-        }
-        writePage();
-      } else if (props.estimateNextSizeCheck()) {
-        // not reached the threshold, will check again midway
-        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * props.getPageSizeThreshold() / memSize)) / 2 + 1;
-      } else {
-        valueCountForNextSizeCheck += props.getMinRowCountForPageSizeCheck();
-      }
-    }
-  }
-
-  private void updateStatisticsNumNulls() {
-    statistics.incrementNumNulls();
-  }
-
-  private void updateStatistics(int value) {
-    statistics.updateStats(value);
-  }
-
-  private void updateStatistics(long value) {
-    statistics.updateStats(value);
-  }
-
-  private void updateStatistics(float value) {
-    statistics.updateStats(value);
-  }
-
-  private void updateStatistics(double value) {
-   statistics.updateStats(value);
-  }
-
-  private void updateStatistics(Binary value) {
-   statistics.updateStats(value);
-  }
-
-  private void updateStatistics(boolean value) {
-   statistics.updateStats(value);
-  }
-
-  private void writePage() {
-    if (DEBUG) LOG.debug("write page");
-    try {
-      pageWriter.writePage(
-          concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),
-          valueCount,
-          statistics,
-          repetitionLevelColumn.getEncoding(),
-          definitionLevelColumn.getEncoding(),
-          dataColumn.getEncoding());
-    } catch (IOException e) {
-      throw new ParquetEncodingException("could not write page for " + path, e);
-    }
-    repetitionLevelColumn.reset();
-    definitionLevelColumn.reset();
-    dataColumn.reset();
-    valueCount = 0;
-    resetStatistics();
-  }
-
-  @Override
-  public void writeNull(int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(null, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    updateStatisticsNumNulls();
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(double value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeDouble(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(float value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeFloat(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  @Override
-  public void write(Binary value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeBytes(value);
-    updateStatistics(value);
-    accountForValueWritten();
+  ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
+    super(path, pageWriter, props);
   }
 
   @Override
-  public void write(boolean value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeBoolean(value);
-    updateStatistics(value);
-    accountForValueWritten();
+  ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) {
+    return props.newRepetitionLevelWriter(path);
   }
 
   @Override
-  public void write(int value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeInteger(value);
-    updateStatistics(value);
-    accountForValueWritten();
+  ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) {
+    return props.newDefinitionLevelWriter(path);
   }
 
   @Override
-  public void write(long value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevelColumn.writeInteger(repetitionLevel);
-    definitionLevelColumn.writeInteger(definitionLevel);
-    dataColumn.writeLong(value);
-    updateStatistics(value);
-    accountForValueWritten();
-  }
-
-  public void flush() {
-    if (valueCount > 0) {
-      writePage();
-    }
-    final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
-    if (dictionaryPage != null) {
-      if (DEBUG) LOG.debug("write dictionary");
-      try {
-        pageWriter.writeDictionaryPage(dictionaryPage);
-      } catch (IOException e) {
-        throw new ParquetEncodingException("could not write dictionary page for " + path, e);
-      }
-      dataColumn.resetDictionary();
-    }
-  }
-
-  @Override
-  public void close() {
-    flush();
-    // Close the Values writers.
-    repetitionLevelColumn.close();
-    definitionLevelColumn.close();
-    dataColumn.close();
-  }
-
-  @Override
-  public long getBufferedSizeInMemory() {
-    return repetitionLevelColumn.getBufferedSize()
-        + definitionLevelColumn.getBufferedSize()
-        + dataColumn.getBufferedSize()
-        + pageWriter.getMemSize();
-  }
-
-  public long allocatedSize() {
-    return repetitionLevelColumn.getAllocatedSize()
-    + definitionLevelColumn.getAllocatedSize()
-    + dataColumn.getAllocatedSize()
-    + pageWriter.allocatedSize();
-  }
-
-  public String memUsageString(String indent) {
-    StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
-    b.append(repetitionLevelColumn.memUsageString(indent + "  r:")).append("\n");
-    b.append(definitionLevelColumn.memUsageString(indent + "  d:")).append("\n");
-    b.append(dataColumn.memUsageString(indent + "  data:")).append("\n");
-    b.append(pageWriter.memUsageString(indent + "  pages:")).append("\n");
-    b.append(indent).append(String.format("  total: %,d/%,d", getBufferedSizeInMemory(), allocatedSize())).append("\n");
-    b.append(indent).append("}\n");
-    return b.toString();
+  void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
+      ValuesWriter definitionLevels, ValuesWriter values) throws IOException {
+    pageWriter.writePage(
+        concat(repetitionLevels.getBytes(), definitionLevels.getBytes(), values.getBytes()),
+        valueCount,
+        rowCount,
+        statistics,
+        repetitionLevels.getEncoding(),
+        definitionLevels.getEncoding(),
+        values.getEncoding());
   }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
index 9abdee8a5..04076c96b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
@@ -23,291 +23,67 @@
 import org.apache.parquet.Ints;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import org.apache.parquet.io.ParquetEncodingException;
-import org.apache.parquet.io.api.Binary;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
  */
-final class ColumnWriterV2 implements ColumnWriter {
-  private static final Logger LOG = LoggerFactory.getLogger(ColumnWriterV2.class);
+final class ColumnWriterV2 extends ColumnWriterBase {
 
-  // By default: Debugging disabled this way (using the "if (DEBUG)" IN the methods) to allow
-  // the java compiler (not the JIT) to remove the unused statements during build time.
-  private static final boolean DEBUG = false;
-
-  private final ColumnDescriptor path;
-  private final PageWriter pageWriter;
-  private RunLengthBitPackingHybridEncoder repetitionLevelColumn;
-  private RunLengthBitPackingHybridEncoder definitionLevelColumn;
-  private ValuesWriter dataColumn;
-  private int valueCount;
-
-  private Statistics<?> statistics;
-  private long rowsWrittenSoFar = 0;
-
-  public ColumnWriterV2(
-      ColumnDescriptor path,
-      PageWriter pageWriter,
-      ParquetProperties props) {
-    this.path = path;
-    this.pageWriter = pageWriter;
-    resetStatistics();
-
-    this.repetitionLevelColumn = props.newRepetitionLevelEncoder(path);
-    this.definitionLevelColumn = props.newDefinitionLevelEncoder(path);
-    this.dataColumn = props.newValuesWriter(path);
-  }
-
-  private void log(Object value, int r, int d) {
-    LOG.debug("{} {} r:{} d:{}", path, value, r, d);
-  }
-
-  private void resetStatistics() {
-    this.statistics = Statistics.createStats(path.getPrimitiveType());
-  }
-
-  private void definitionLevel(int definitionLevel) {
-    try {
-      definitionLevelColumn.writeInt(definitionLevel);
-    } catch (IOException e) {
-      throw new ParquetEncodingException("illegal definition level " + definitionLevel + " for column " + path, e);
+  // Extending the original implementation to not to write the size of the data as the original writer would
+  private static class RLEWriterForV2 extends RunLengthBitPackingHybridValuesWriter {
+    public RLEWriterForV2(RunLengthBitPackingHybridEncoder encoder) {
+      super(encoder);
     }
-  }
 
-  private void repetitionLevel(int repetitionLevel) {
-    try {
-      repetitionLevelColumn.writeInt(repetitionLevel);
-    } catch (IOException e) {
-      throw new ParquetEncodingException("illegal repetition level " + repetitionLevel + " for column " + path, e);
-    }
-  }
-
-  /**
-   * writes the current null value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void writeNull(int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(null, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    statistics.incrementNumNulls();
-    ++ valueCount;
-  }
-
-  @Override
-  public void close() {
-    // Close the Values writers.
-    repetitionLevelColumn.close();
-    definitionLevelColumn.close();
-    dataColumn.close();
-  }
-
-  @Override
-  public long getBufferedSizeInMemory() {
-    return repetitionLevelColumn.getBufferedSize()
-      + definitionLevelColumn.getBufferedSize()
-      + dataColumn.getBufferedSize()
-      + pageWriter.getMemSize();
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(double value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeDouble(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(float value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeFloat(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(Binary value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeBytes(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(boolean value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeBoolean(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(int value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeInteger(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * writes the current value
-   * @param value
-   * @param repetitionLevel
-   * @param definitionLevel
-   */
-  public void write(long value, int repetitionLevel, int definitionLevel) {
-    if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeLong(value);
-    statistics.updateStats(value);
-    ++ valueCount;
-  }
-
-  /**
-   * Finalizes the Column chunk. Possibly adding extra pages if needed (dictionary, ...)
-   * Is called right after writePage
-   */
-  public void finalizeColumnChunk() {
-    final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
-    if (dictionaryPage != null) {
-      if (DEBUG) LOG.debug("write dictionary");
+    @Override
+    public BytesInput getBytes() {
       try {
-        pageWriter.writeDictionaryPage(dictionaryPage);
+        return encoder.toBytes();
       } catch (IOException e) {
-        throw new ParquetEncodingException("could not write dictionary page for " + path, e);
+        throw new ParquetEncodingException(e);
       }
-      dataColumn.resetDictionary();
     }
   }
 
-  /**
-   * used to decide when to write a page
-   * @return the number of bytes of memory used to buffer the current data
-   */
-  public long getCurrentPageBufferedSize() {
-    return repetitionLevelColumn.getBufferedSize()
-        + definitionLevelColumn.getBufferedSize()
-        + dataColumn.getBufferedSize();
-  }
-
-  /**
-   * used to decide when to write a page or row group
-   * @return the number of bytes of memory used to buffer the current data and the previously written pages
-   */
-  public long getTotalBufferedSize() {
-    return repetitionLevelColumn.getBufferedSize()
-        + definitionLevelColumn.getBufferedSize()
-        + dataColumn.getBufferedSize()
-        + pageWriter.getMemSize();
-  }
+  private static final ValuesWriter NULL_WRITER = new DevNullValuesWriter();
 
-  /**
-   * @return actual memory used
-   */
-  public long allocatedSize() {
-    return repetitionLevelColumn.getAllocatedSize()
-    + definitionLevelColumn.getAllocatedSize()
-    + dataColumn.getAllocatedSize()
-    + pageWriter.allocatedSize();
+  ColumnWriterV2(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
+    super(path, pageWriter, props);
   }
 
-  /**
-   * @param indent a prefix to format lines
-   * @return a formatted string showing how memory is used
-   */
-  public String memUsageString(String indent) {
-    StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
-    b.append(indent).append(" r:").append(repetitionLevelColumn.getAllocatedSize()).append(" bytes\n");
-    b.append(indent).append(" d:").append(definitionLevelColumn.getAllocatedSize()).append(" bytes\n");
-    b.append(dataColumn.memUsageString(indent + "  data:")).append("\n");
-    b.append(pageWriter.memUsageString(indent + "  pages:")).append("\n");
-    b.append(indent).append(String.format("  total: %,d/%,d", getTotalBufferedSize(), allocatedSize())).append("\n");
-    b.append(indent).append("}\n");
-    return b.toString();
+  @Override
+  ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) {
+    return path.getMaxRepetitionLevel() == 0 ? NULL_WRITER : new RLEWriterForV2(props.newRepetitionLevelEncoder(path));
   }
 
-  public long getRowsWrittenSoFar() {
-    return this.rowsWrittenSoFar;
+  @Override
+  ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) {
+    return path.getMaxDefinitionLevel() == 0 ? NULL_WRITER : new RLEWriterForV2(props.newDefinitionLevelEncoder(path));
   }
 
-  /**
-   * writes the current data to a new page in the page store
-   * @param rowCount how many rows have been written so far
-   */
-  public void writePage(long rowCount) {
-    int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
-    this.rowsWrittenSoFar = rowCount;
-    if (DEBUG) LOG.debug("write page");
-    try {
-      // TODO: rework this API. Those must be called *in that order*
-      BytesInput bytes = dataColumn.getBytes();
-      Encoding encoding = dataColumn.getEncoding();
-      pageWriter.writePageV2(
-          pageRowCount,
-          Ints.checkedCast(statistics.getNumNulls()),
-          valueCount,
-          path.getMaxRepetitionLevel() == 0 ? BytesInput.empty() : repetitionLevelColumn.toBytes(),
-          path.getMaxDefinitionLevel() == 0 ? BytesInput.empty() : definitionLevelColumn.toBytes(),
-          encoding,
-          bytes,
-          statistics
-          );
-    } catch (IOException e) {
-      throw new ParquetEncodingException("could not write page for " + path, e);
-    }
-    repetitionLevelColumn.reset();
-    definitionLevelColumn.reset();
-    dataColumn.reset();
-    valueCount = 0;
-    resetStatistics();
+  @Override
+  void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
+      ValuesWriter definitionLevels, ValuesWriter values) throws IOException {
+    // TODO: rework this API. The bytes shall be retrieved before the encoding (encoding might be different otherwise)
+    BytesInput bytes = values.getBytes();
+    Encoding encoding = values.getEncoding();
+    pageWriter.writePageV2(
+        rowCount,
+        Ints.checkedCast(statistics.getNumNulls()),
+        valueCount,
+        repetitionLevels.getBytes(),
+        definitionLevels.getBytes(),
+        encoding,
+        bytes,
+        statistics);
   }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
index a2d079f9c..a72be48b5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 
-import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.statistics.Statistics;
@@ -39,9 +38,25 @@
    * @param dlEncoding definition level encoding
    * @param valuesEncoding values encoding
    * @throws IOException if there is an exception while writing page data
+   * @deprecated will be removed in 2.0.0. This method does not support writing column indexes; Use
+   *             {@link #writePage(BytesInput, int, int, Statistics, Encoding, Encoding, Encoding)} instead
    */
+  @Deprecated
   void writePage(BytesInput bytesInput, int valueCount, Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
 
+  /**
+   * writes a single page
+   * @param bytesInput the bytes for the page
+   * @param valueCount the number of values in that page
+   * @param rowCount the number of rows in that page
+   * @param statistics the statistics for that page
+   * @param rlEncoding repetition level encoding
+   * @param dlEncoding definition level encoding
+   * @param valuesEncoding values encoding
+   * @throws IOException
+   */
+  void writePage(BytesInput bytesInput, int valueCount, int rowCount, Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
+
   /**
    * writes a single page in the new format
    * @param rowCount the number of rows in this page
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
index 3b7a5def4..a51a8c4d8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.column.values.rle;
 
 import java.io.IOException;
+import java.util.Objects;
 
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.Ints;
@@ -28,10 +29,14 @@
 import org.apache.parquet.io.ParquetEncodingException;
 
 public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
-  private final RunLengthBitPackingHybridEncoder encoder;
+  protected final RunLengthBitPackingHybridEncoder encoder;
 
   public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
-    this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, pageSize, allocator);
+    this(new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, pageSize, allocator));
+  }
+
+  protected RunLengthBitPackingHybridValuesWriter(RunLengthBitPackingHybridEncoder encoder) {
+    this.encoder = Objects.requireNonNull(encoder);
   }
 
   @Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
new file mode 100644
index 000000000..c35251680
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
+
+class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
+  private static class BinaryColumnIndex extends ColumnIndexBase {
+    private Binary[] minValues;
+    private Binary[] maxValues;
+
+    private BinaryColumnIndex(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int pageIndex) {
+      return convert(minValues[pageIndex]);
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int pageIndex) {
+      return convert(maxValues[pageIndex]);
+    }
+
+    @Override
+    String getMinValueAsString(int pageIndex) {
+      return stringifier.stringify(minValues[pageIndex]);
+    }
+
+    @Override
+    String getMaxValueAsString(int pageIndex) {
+      return stringifier.stringify(maxValues[pageIndex]);
+    }
+  }
+
+  private final List<Binary> minValues = new ArrayList<>();
+  private final List<Binary> maxValues = new ArrayList<>();
+
+  private static Binary convert(ByteBuffer buffer) {
+    return Binary.fromReusedByteBuffer(buffer);
+  }
+
+  private static ByteBuffer convert(Binary value) {
+    return value.toByteBuffer();
+  }
+
+  @Override
+  void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    minValues.add(min == null ? null : convert(min));
+    maxValues.add(max == null ? null : convert(max));
+  }
+
+  @Override
+  void addMinMax(Object min, Object max) {
+    minValues.add((Binary) min);
+    maxValues.add((Binary) max);
+  }
+
+  @Override
+  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    BinaryColumnIndex columnIndex = new BinaryColumnIndex(type);
+    columnIndex.minValues = minValues.toArray(new Binary[minValues.size()]);
+    columnIndex.maxValues = maxValues.toArray(new Binary[maxValues.size()]);
+    return columnIndex;
+  }
+
+  @Override
+  void clearMinMax() {
+    minValues.clear();
+    maxValues.clear();
+  }
+
+  @Override
+  int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(minValues.get(index1), minValues.get(index2));
+  }
+
+  @Override
+  int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(maxValues.get(index1), maxValues.get(index2));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
new file mode 100644
index 000000000..9a4ea89c6
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.nio.ByteBuffer;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
+
+import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
+import it.unimi.dsi.fastutil.booleans.BooleanList;
+
+class BooleanColumnIndexBuilder extends ColumnIndexBuilder {
+  private static class BooleanColumnIndex extends ColumnIndexBase {
+    private boolean[] minValues;
+    private boolean[] maxValues;
+
+    private BooleanColumnIndex(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int pageIndex) {
+      return convert(minValues[pageIndex]);
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int pageIndex) {
+      return convert(maxValues[pageIndex]);
+    }
+
+    @Override
+    String getMinValueAsString(int pageIndex) {
+      return stringifier.stringify(minValues[pageIndex]);
+    }
+
+    @Override
+    String getMaxValueAsString(int pageIndex) {
+      return stringifier.stringify(maxValues[pageIndex]);
+    }
+  }
+
+  private final BooleanList minValues = new BooleanArrayList();
+  private final BooleanList maxValues = new BooleanArrayList();
+
+  private static boolean convert(ByteBuffer buffer) {
+    return buffer.get(0) != 0;
+  }
+
+  private static ByteBuffer convert(boolean value) {
+    return ByteBuffer.allocate(1).put(0, value ? (byte) 1 : 0);
+  }
+
+  @Override
+  void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    minValues.add(min == null ? false : convert(min));
+    maxValues.add(max == null ? false : convert(max));
+  }
+
+  @Override
+  void addMinMax(Object min, Object max) {
+    minValues.add(min == null ? false : (boolean) min);
+    maxValues.add(max == null ? false : (boolean) max);
+  }
+
+  @Override
+  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    BooleanColumnIndex columnIndex = new BooleanColumnIndex(type);
+    columnIndex.minValues = minValues.toBooleanArray();
+    columnIndex.maxValues = maxValues.toBooleanArray();
+    return columnIndex;
+  }
+
+  @Override
+  void clearMinMax() {
+    minValues.clear();
+    maxValues.clear();
+  }
+
+  @Override
+  int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(minValues.get(index1), minValues.get(index2));
+  }
+
+  @Override
+  int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(maxValues.get(index1), maxValues.get(index2));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java
new file mode 100644
index 000000000..5d8281538
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BoundaryOrder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+/**
+ * Enum for {@link org.apache.parquet.format.BoundaryOrder}.
+ */
+public enum BoundaryOrder {
+  UNORDERED, ASCENDING, DESCENDING;
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java
new file mode 100644
index 000000000..f7bd16b92
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Column index containing min/max and null count values for the pages in a column chunk.
+ *
+ * @see org.apache.parquet.format.ColumnIndex
+ */
+public interface ColumnIndex {
+  /**
+   * @return the boundary order of the min/max values; used for converting to the related thrift object
+   */
+  public BoundaryOrder getBoundaryOrder();
+
+  /**
+   * @return the unmodifiable list of null counts; used for converting to the related thrift object
+   */
+  public List<Long> getNullCounts();
+
+  /**
+   * @return the unmodifiable list of null pages; used for converting to the related thrift object
+   */
+  public List<Boolean> getNullPages();
+
+  /**
+   * @return the list of the min values as {@link ByteBuffer}s; used for converting to the related thrift object
+   */
+  public List<ByteBuffer> getMinValues();
+
+  /**
+   * @return the list of the max values as {@link ByteBuffer}s; used for converting to the related thrift object
+   */
+  public List<ByteBuffer> getMaxValues();
+
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
new file mode 100644
index 000000000..6edd75355
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static java.util.Objects.requireNonNull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.Formatter;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveStringifier;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
+import it.unimi.dsi.fastutil.booleans.BooleanList;
+import it.unimi.dsi.fastutil.booleans.BooleanLists;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+import it.unimi.dsi.fastutil.longs.LongLists;
+
+/**
+ * Builder implementation to create {@link ColumnIndex} objects during writing a parquet file.
+ */
+public abstract class ColumnIndexBuilder {
+
+  static abstract class ColumnIndexBase implements ColumnIndex {
+    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+    private static final int MAX_VALUE_LENGTH_FOR_TOSTRING = 40;
+    private static final String TOSTRING_TRUNCATION_MARKER = "(...)";
+    private static final int TOSTRING_TRUNCATION_START_POS =
+        (MAX_VALUE_LENGTH_FOR_TOSTRING - TOSTRING_TRUNCATION_MARKER.length()) / 2;
+    private static final int TOSTRING_TRUNCATION_END_POS =
+        MAX_VALUE_LENGTH_FOR_TOSTRING - TOSTRING_TRUNCATION_MARKER.length() - TOSTRING_TRUNCATION_START_POS;
+    private static final String TOSTRING_MISSING_VALUE_MARKER = "<none>";
+
+    final PrimitiveStringifier stringifier;
+    final PrimitiveComparator<Binary> comparator;
+    private boolean[] nullPages;
+    private BoundaryOrder boundaryOrder;
+    // might be null
+    private long[] nullCounts;
+
+    static String truncate(String str) {
+      if (str.length() <= MAX_VALUE_LENGTH_FOR_TOSTRING) {
+        return str;
+      }
+      return str.substring(0, TOSTRING_TRUNCATION_START_POS) + TOSTRING_TRUNCATION_MARKER
+          + str.substring(str.length() - TOSTRING_TRUNCATION_END_POS);
+    }
+
+    ColumnIndexBase(PrimitiveType type) {
+      comparator = type.comparator();
+      stringifier = type.stringifier();
+    }
+
+    @Override
+    public BoundaryOrder getBoundaryOrder() {
+      return boundaryOrder;
+    }
+
+    @Override
+    public List<Long> getNullCounts() {
+      if (nullCounts == null) {
+        return null;
+      }
+      return LongLists.unmodifiable(LongArrayList.wrap(nullCounts));
+    }
+
+    @Override
+    public List<Boolean> getNullPages() {
+      return BooleanLists.unmodifiable(BooleanArrayList.wrap(nullPages));
+    }
+
+    @Override
+    public List<ByteBuffer> getMinValues() {
+      List<ByteBuffer> list = new ArrayList<>(getPageCount());
+      for (int i = 0, n = getPageCount(); i < n; ++i) {
+        if (isNullPage(i)) {
+          list.add(EMPTY_BYTE_BUFFER);
+        } else {
+          list.add(getMinValueAsBytes(i));
+        }
+      }
+      return list;
+    }
+
+    @Override
+    public List<ByteBuffer> getMaxValues() {
+      List<ByteBuffer> list = new ArrayList<>(getPageCount());
+      for (int i = 0, n = getPageCount(); i < n; ++i) {
+        if (isNullPage(i)) {
+          list.add(EMPTY_BYTE_BUFFER);
+        } else {
+          list.add(getMaxValueAsBytes(i));
+        }
+      }
+      return list;
+    }
+
+    @Override
+    public String toString() {
+      try (Formatter formatter = new Formatter()) {
+        formatter.format("Boudary order: %s\n", boundaryOrder);
+        String minMaxPart = "  %-" + MAX_VALUE_LENGTH_FOR_TOSTRING + "s  %-" + MAX_VALUE_LENGTH_FOR_TOSTRING + "s\n";
+        formatter.format("%-10s  %20s" + minMaxPart, "", "null count", "min", "max");
+        String format = "page-%-5d  %20s" + minMaxPart;
+        for (int i = 0, n = nullPages.length; i < n; ++i) {
+          String nullCount = nullCounts == null ? TOSTRING_MISSING_VALUE_MARKER : Long.toString(nullCounts[i]);
+          String min, max;
+          if (nullPages[i]) {
+            min = max = TOSTRING_MISSING_VALUE_MARKER;
+          } else {
+            min = truncate(getMinValueAsString(i));
+            max = truncate(getMaxValueAsString(i));
+          }
+          formatter.format(format, i, nullCount, min, max);
+        }
+        return formatter.toString();
+      }
+    }
+
+    int getPageCount() {
+      return nullPages.length;
+    }
+
+    boolean isNullPage(int pageIndex) {
+      return nullPages[pageIndex];
+    }
+
+    abstract ByteBuffer getMinValueAsBytes(int pageIndex);
+
+    abstract ByteBuffer getMaxValueAsBytes(int pageIndex);
+
+    abstract String getMinValueAsString(int pageIndex);
+
+    abstract String getMaxValueAsString(int pageIndex);
+  }
+
+  private static final ColumnIndexBuilder NO_OP_BUILDER = new ColumnIndexBuilder() {
+    @Override
+    public ColumnIndex build() {
+      return null;
+    }
+
+    @Override
+    public void add(Statistics<?> stats) {
+    }
+
+    @Override
+    void addMinMax(Object min, Object max) {
+    }
+
+    @Override
+    ColumnIndexBase createColumnIndex(PrimitiveType type) {
+      return null;
+    }
+
+    @Override
+    void clearMinMax() {
+    }
+
+    @Override
+    void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    }
+
+    @Override
+    int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+      return 0;
+    }
+
+    @Override
+    int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+      return 0;
+    }
+  };
+
+  private static final Map<PrimitiveTypeName, ColumnIndexBuilder> BUILDERS = new EnumMap<>(PrimitiveTypeName.class);
+
+  private PrimitiveType type;
+  private final BooleanList nullPages = new BooleanArrayList();
+  private final LongList nullCounts = new LongArrayList();
+
+  /**
+   * @return a no-op builder that does not collect statistics objects and therefore returns {@code null} at
+   *         {@link #build()}.
+   */
+  public static ColumnIndexBuilder getNoOpBuilder() {
+    return NO_OP_BUILDER;
+  }
+
+  /**
+   * @param type
+   *          the type this builder is to be created for
+   * @return a {@link ColumnIndexBuilder} instance to be used for creating {@link ColumnIndex} objects
+   */
+  public static ColumnIndexBuilder getBuilder(PrimitiveType type) {
+    ColumnIndexBuilder builder = createNewBuilder(type.getPrimitiveTypeName());
+    builder.type = type;
+    return builder;
+  }
+
+  private static ColumnIndexBuilder createNewBuilder(PrimitiveTypeName type) {
+    switch (type) {
+      case BINARY:
+      case FIXED_LEN_BYTE_ARRAY:
+      case INT96:
+        return new BinaryColumnIndexBuilder();
+      case BOOLEAN:
+        return new BooleanColumnIndexBuilder();
+      case DOUBLE:
+        return new DoubleColumnIndexBuilder();
+      case FLOAT:
+        return new FloatColumnIndexBuilder();
+      case INT32:
+        return new IntColumnIndexBuilder();
+      case INT64:
+        return new LongColumnIndexBuilder();
+      default:
+        throw new IllegalArgumentException("Unsupported type for column index: " + type);
+    }
+  }
+
+  /**
+   * @param type
+   *          the primitive type
+   * @param boundaryOrder
+   *          the boundary order of the min/max values
+   * @param nullPages
+   *          the null pages (one boolean value for each page that signifies whether the page consists of nulls
+   *          entirely)
+   * @param nullCounts
+   *          the number of null values for each page
+   * @param minValues
+   *          the min values for each page
+   * @param maxValues
+   *          the max values for each page
+   * @return the newly created {@link ColumnIndex} object based on the specified arguments
+   */
+  public static ColumnIndex build(
+      PrimitiveType type,
+      BoundaryOrder boundaryOrder,
+      List<Boolean> nullPages,
+      List<Long> nullCounts,
+      List<ByteBuffer> minValues,
+      List<ByteBuffer> maxValues) {
+
+    PrimitiveTypeName typeName = type.getPrimitiveTypeName();
+    ColumnIndexBuilder builder = BUILDERS.get(typeName);
+    if (builder == null) {
+      builder = createNewBuilder(typeName);
+      BUILDERS.put(typeName, builder);
+    }
+
+    builder.fill(nullPages, nullCounts, minValues, maxValues);
+    ColumnIndexBase columnIndex = builder.build(type);
+    columnIndex.boundaryOrder = requireNonNull(boundaryOrder);
+    return columnIndex;
+  }
+
+  ColumnIndexBuilder() {
+    // Shall be able to be created inside this package only
+  }
+
+  /**
+   * Adds the data from the specified statistics to this builder
+   *
+   * @param stats
+   *          the statistics to be added
+   */
+  public void add(Statistics<?> stats) {
+    if (stats.hasNonNullValue()) {
+      nullPages.add(false);
+      addMinMax(stats.genericGetMin(), stats.genericGetMax());
+    } else {
+      nullPages.add(true);
+      addMinMax(null, null);
+    }
+    nullCounts.add(stats.getNumNulls());
+  }
+
+  abstract void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max);
+
+  abstract void addMinMax(Object min, Object max);
+
+  private void fill(List<Boolean> nullPages, List<Long> nullCounts, List<ByteBuffer> minValues,
+      List<ByteBuffer> maxValues) {
+    clear();
+    int requiredSize = nullPages.size();
+    if ((nullCounts != null && nullCounts.size() != requiredSize) || minValues.size() != requiredSize
+        || maxValues.size() != requiredSize) {
+      throw new IllegalArgumentException(
+          String.format("Not all sizes are equal (nullPages:%d, nullCounts:%s, minValues:%d, maxValues:%d",
+              nullPages.size(), nullCounts == null ? "null" : nullCounts.size(), minValues.size(), maxValues.size()));
+    }
+    this.nullPages.addAll(nullPages);
+    // Null counts is optional in the format
+    if (nullCounts != null) {
+      this.nullCounts.addAll(nullCounts);
+    }
+
+    for (int i = 0; i < requiredSize; ++i) {
+      if (nullPages.get(i)) {
+        addMinMaxFromBytes(null, null);
+      } else {
+        addMinMaxFromBytes(minValues.get(i), maxValues.get(i));
+      }
+    }
+  }
+
+  /**
+   * @return the newly created column index or {@code null} if the {@link ColumnIndex} would be empty
+   */
+  public ColumnIndex build() {
+    ColumnIndexBase columnIndex = build(type);
+    if (columnIndex == null) {
+      return null;
+    }
+    columnIndex.boundaryOrder = calculateBoundaryOrder(type.comparator());
+    return columnIndex;
+  }
+
+  private ColumnIndexBase build(PrimitiveType type) {
+    if (nullPages.isEmpty()) {
+      return null;
+    }
+    ColumnIndexBase columnIndex = createColumnIndex(type);
+    columnIndex.nullPages = nullPages.toBooleanArray();
+    // Null counts is optional so keep it null if the builder has no values
+    if (!nullCounts.isEmpty()) {
+      columnIndex.nullCounts = nullCounts.toLongArray();
+    }
+
+    return columnIndex;
+  }
+
+  private BoundaryOrder calculateBoundaryOrder(PrimitiveComparator<Binary> comparator) {
+    if (isAscending(comparator)) {
+      return BoundaryOrder.ASCENDING;
+    } else if (isDescending(comparator)) {
+      return BoundaryOrder.DESCENDING;
+    } else {
+      return BoundaryOrder.UNORDERED;
+    }
+  }
+
+  // min[i] <= min[i+1] && max[i] <= max[i+1]
+  private boolean isAscending(PrimitiveComparator<Binary> comparator) {
+    int prevPage = nextNonNullPage(0);
+    // All pages are null-page
+    if (prevPage < 0) {
+      return false;
+    }
+    int nextPage = nextNonNullPage(prevPage + 1);
+    while (nextPage > 0) {
+      if (compareMinValues(comparator, prevPage, nextPage) > 0
+          || compareMaxValues(comparator, prevPage, nextPage) > 0) {
+        return false;
+      }
+      prevPage = nextPage;
+      nextPage = nextNonNullPage(nextPage + 1);
+    }
+    return true;
+  }
+
+  // min[i] >= min[i+1] && max[i] >= max[i+1]
+  private boolean isDescending(PrimitiveComparator<Binary> comparator) {
+    int prevPage = nextNonNullPage(0);
+    // All pages are null-page
+    if (prevPage < 0) {
+      return false;
+    }
+    int nextPage = nextNonNullPage(prevPage + 1);
+    while (nextPage > 0) {
+      if (compareMinValues(comparator, prevPage, nextPage) < 0
+          || compareMaxValues(comparator, prevPage, nextPage) < 0) {
+        return false;
+      }
+      prevPage = nextPage;
+      nextPage = nextNonNullPage(nextPage + 1);
+    }
+    return true;
+  }
+
+  private int nextNonNullPage(int startIndex) {
+    for (int i = startIndex, n = nullPages.size(); i < n; ++i) {
+      if (!nullPages.get(i)) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  abstract int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2);
+
+  abstract int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2);
+
+  private void clear() {
+    nullPages.clear();
+    nullCounts.clear();
+    clearMinMax();
+  }
+
+  abstract void clearMinMax();
+
+  abstract ColumnIndexBase createColumnIndex(PrimitiveType type);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
new file mode 100644
index 000000000..249652aa2
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
+
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.doubles.DoubleList;
+
+class DoubleColumnIndexBuilder extends ColumnIndexBuilder {
+  private static class DoubleColumnIndex extends ColumnIndexBase {
+    private double[] minValues;
+    private double[] maxValues;
+
+    private DoubleColumnIndex(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int pageIndex) {
+      return convert(minValues[pageIndex]);
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int pageIndex) {
+      return convert(maxValues[pageIndex]);
+    }
+
+    @Override
+    String getMinValueAsString(int pageIndex) {
+      return stringifier.stringify(minValues[pageIndex]);
+    }
+
+    @Override
+    String getMaxValueAsString(int pageIndex) {
+      return stringifier.stringify(maxValues[pageIndex]);
+    }
+  }
+
+  private final DoubleList minValues = new DoubleArrayList();
+  private final DoubleList maxValues = new DoubleArrayList();
+
+  private static double convert(ByteBuffer buffer) {
+    return buffer.order(LITTLE_ENDIAN).getDouble(0);
+  }
+
+  private static ByteBuffer convert(double value) {
+    return ByteBuffer.allocate(Double.SIZE / 8).order(LITTLE_ENDIAN).putDouble(0, value);
+  }
+
+  @Override
+  void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    minValues.add(min == null ? 0 : convert(min));
+    maxValues.add(max == null ? 0 : convert(max));
+  }
+
+  @Override
+  void addMinMax(Object min, Object max) {
+    minValues.add(min == null ? 0 : (double) min);
+    maxValues.add(max == null ? 0 : (double) max);
+  }
+
+  @Override
+  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    DoubleColumnIndex columnIndex = new DoubleColumnIndex(type);
+    columnIndex.minValues = minValues.toDoubleArray();
+    columnIndex.maxValues = maxValues.toDoubleArray();
+    return columnIndex;
+  }
+
+  @Override
+  void clearMinMax() {
+    minValues.clear();
+    maxValues.clear();
+  }
+
+  @Override
+  int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(minValues.get(index1), minValues.get(index2));
+  }
+
+  @Override
+  int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(maxValues.get(index1), maxValues.get(index2));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
new file mode 100644
index 000000000..24c911fae
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
+
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.floats.FloatList;
+
+class FloatColumnIndexBuilder extends ColumnIndexBuilder {
+  private static class FloatColumnIndex extends ColumnIndexBase {
+    private float[] minValues;
+    private float[] maxValues;
+
+    private FloatColumnIndex(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int pageIndex) {
+      return convert(minValues[pageIndex]);
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int pageIndex) {
+      return convert(maxValues[pageIndex]);
+    }
+
+    @Override
+    String getMinValueAsString(int pageIndex) {
+      return stringifier.stringify(minValues[pageIndex]);
+    }
+
+    @Override
+    String getMaxValueAsString(int pageIndex) {
+      return stringifier.stringify(maxValues[pageIndex]);
+    }
+  }
+
+  private final FloatList minValues = new FloatArrayList();
+  private final FloatList maxValues = new FloatArrayList();
+
+  private static float convert(ByteBuffer buffer) {
+    return buffer.order(LITTLE_ENDIAN).getFloat(0);
+  }
+
+  private static ByteBuffer convert(float value) {
+    return ByteBuffer.allocate(Float.SIZE / 8).order(LITTLE_ENDIAN).putFloat(0, value);
+  }
+
+  @Override
+  void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    minValues.add(min == null ? 0 : convert(min));
+    maxValues.add(max == null ? 0 : convert(max));
+  }
+
+  @Override
+  void addMinMax(Object min, Object max) {
+    minValues.add(min == null ? 0 : (float) min);
+    maxValues.add(max == null ? 0 : (float) max);
+  }
+
+  @Override
+  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    FloatColumnIndex columnIndex = new FloatColumnIndex(type);
+    columnIndex.minValues = minValues.toFloatArray();
+    columnIndex.maxValues = maxValues.toFloatArray();
+    return columnIndex;
+  }
+
+  @Override
+  void clearMinMax() {
+    minValues.clear();
+    maxValues.clear();
+  }
+
+  @Override
+  int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(minValues.get(index1), minValues.get(index2));
+  }
+
+  @Override
+  int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(maxValues.get(index1), maxValues.get(index2));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
new file mode 100644
index 000000000..e4a117c6f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+class IntColumnIndexBuilder extends ColumnIndexBuilder {
+  private static class IntColumnIndex extends ColumnIndexBase {
+    private int[] minValues;
+    private int[] maxValues;
+
+    private IntColumnIndex(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int pageIndex) {
+      return convert(minValues[pageIndex]);
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int pageIndex) {
+      return convert(maxValues[pageIndex]);
+    }
+
+    @Override
+    String getMinValueAsString(int pageIndex) {
+      return stringifier.stringify(minValues[pageIndex]);
+    }
+
+    @Override
+    String getMaxValueAsString(int pageIndex) {
+      return stringifier.stringify(maxValues[pageIndex]);
+    }
+  }
+
+  private final IntList minValues = new IntArrayList();
+  private final IntList maxValues = new IntArrayList();
+
+  private static int convert(ByteBuffer buffer) {
+    return buffer.order(LITTLE_ENDIAN).getInt(0);
+  }
+
+  private static ByteBuffer convert(int value) {
+    return ByteBuffer.allocate(Integer.SIZE / 8).order(LITTLE_ENDIAN).putInt(0, value);
+  }
+
+  @Override
+  void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    minValues.add(min == null ? 0 : convert(min));
+    maxValues.add(max == null ? 0 : convert(max));
+  }
+
+  @Override
+  void addMinMax(Object min, Object max) {
+    minValues.add(min == null ? 0 : (int) min);
+    maxValues.add(max == null ? 0 : (int) max);
+  }
+
+  @Override
+  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    IntColumnIndex columnIndex = new IntColumnIndex(type);
+    columnIndex.minValues = minValues.toIntArray();
+    columnIndex.maxValues = maxValues.toIntArray();
+    return columnIndex;
+  }
+
+  @Override
+  void clearMinMax() {
+    minValues.clear();
+    maxValues.clear();
+  }
+
+  @Override
+  int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(minValues.get(index1), minValues.get(index2));
+  }
+
+  @Override
+  int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(maxValues.get(index1), maxValues.get(index2));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
new file mode 100644
index 000000000..94e7e0f27
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveType;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+
+class LongColumnIndexBuilder extends ColumnIndexBuilder {
+  private static class LongColumnIndex extends ColumnIndexBase {
+    private long[] minValues;
+    private long[] maxValues;
+
+    private LongColumnIndex(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    ByteBuffer getMinValueAsBytes(int pageIndex) {
+      return convert(minValues[pageIndex]);
+    }
+
+    @Override
+    ByteBuffer getMaxValueAsBytes(int pageIndex) {
+      return convert(maxValues[pageIndex]);
+    }
+
+    @Override
+    String getMinValueAsString(int pageIndex) {
+      return stringifier.stringify(minValues[pageIndex]);
+    }
+
+    @Override
+    String getMaxValueAsString(int pageIndex) {
+      return stringifier.stringify(maxValues[pageIndex]);
+    }
+  }
+
+  private final LongList minValues = new LongArrayList();
+  private final LongList maxValues = new LongArrayList();
+
+  private static long convert(ByteBuffer buffer) {
+    return buffer.order(LITTLE_ENDIAN).getLong(0);
+  }
+
+  private static ByteBuffer convert(long value) {
+    return ByteBuffer.allocate(Long.SIZE / 8).order(LITTLE_ENDIAN).putLong(0, value);
+  }
+
+  @Override
+  void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
+    minValues.add(min == null ? 0 : convert(min));
+    maxValues.add(max == null ? 0 : convert(max));
+  }
+
+  @Override
+  void addMinMax(Object min, Object max) {
+    minValues.add(min == null ? 0 : (long) min);
+    maxValues.add(max == null ? 0 : (long) max);
+  }
+
+  @Override
+  ColumnIndexBase createColumnIndex(PrimitiveType type) {
+    LongColumnIndex columnIndex = new LongColumnIndex(type);
+    columnIndex.minValues = minValues.toLongArray();
+    columnIndex.maxValues = maxValues.toLongArray();
+    return columnIndex;
+  }
+
+  @Override
+  void clearMinMax() {
+    minValues.clear();
+    maxValues.clear();
+  }
+
+  @Override
+  int compareMinValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(minValues.get(index1), minValues.get(index2));
+  }
+
+  @Override
+  int compareMaxValues(PrimitiveComparator<Binary> comparator, int index1, int index2) {
+    return comparator.compare(maxValues.get(index1), maxValues.get(index2));
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
new file mode 100644
index 000000000..fd99ef3a9
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+/**
+ * Offset index containing the offset and size of the page and the index of the first row in the page.
+ *
+ * @see org.apache.parquet.format.OffsetIndex
+ */
+public interface OffsetIndex {
+  /**
+   * @return the number of pages
+   */
+  public int getPageCount();
+
+  /**
+   * @param pageIndex
+   *          the index of the page
+   * @return the offset of the page in the file
+   */
+  public long getOffset(int pageIndex);
+
+  /**
+   * @param pageIndex
+   *          the index of the page
+   * @return the compressed size of the page (including page header)
+   */
+  public int getCompressedPageSize(int pageIndex);
+
+  /**
+   * @param pageIndex
+   *          the index of the page
+   * @return the index of the first row in the page
+   */
+  public long getFirstRowIndex(int pageIndex);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
new file mode 100644
index 000000000..e4907b548
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.util.Formatter;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongList;
+
+/**
+ * Builder implementation to create {@link OffsetIndex} objects during writing a parquet file.
+ */
+public class OffsetIndexBuilder {
+
+  private static class OffsetIndexImpl implements OffsetIndex {
+    private long[] offsets;
+    private int[] compressedPageSizes;
+    private long[] firstRowIndexes;
+
+    @Override
+    public String toString() {
+      try (Formatter formatter = new Formatter()) {
+        formatter.format("%-10s  %20s  %16s  %20s\n", "", "offset", "compressed size", "first row index");
+        for (int i = 0, n = offsets.length; i < n; ++i) {
+          formatter.format("page-%-5d  %20d  %16d  %20d\n", i, offsets[i], compressedPageSizes[i], firstRowIndexes[i]);
+        }
+        return formatter.toString();
+      }
+    }
+
+    @Override
+    public int getPageCount() {
+      return offsets.length;
+    }
+
+    @Override
+    public long getOffset(int pageIndex) {
+      return offsets[pageIndex];
+    }
+
+    @Override
+    public int getCompressedPageSize(int pageIndex) {
+      return compressedPageSizes[pageIndex];
+    }
+
+    @Override
+    public long getFirstRowIndex(int pageIndex) {
+      return firstRowIndexes[pageIndex];
+    }
+  }
+
+  private static final OffsetIndexBuilder NO_OP_BUILDER = new OffsetIndexBuilder() {
+    @Override
+    public void add(int compressedPageSize, long rowCount) {
+    }
+
+    @Override
+    public void add(long offset, int compressedPageSize, long rowCount) {
+    }
+  };
+
+  private final LongList offsets = new LongArrayList();
+  private final IntList compressedPageSizes = new IntArrayList();
+  private final LongList firstRowIndexes = new LongArrayList();
+  private long previousOffset;
+  private int previousPageSize;
+  private long previousRowIndex;
+  private long previousRowCount;
+
+  /**
+   * @return a no-op builder that does not collect values and therefore returns {@code null} at {@link #build(long)}
+   */
+  public static OffsetIndexBuilder getNoOpBuilder() {
+    return NO_OP_BUILDER;
+  }
+
+  /**
+   * @return an {@link OffsetIndexBuilder} instance to build an {@link OffsetIndex} object
+   */
+  public static OffsetIndexBuilder getBuilder() {
+    return new OffsetIndexBuilder();
+  }
+
+  private OffsetIndexBuilder() {
+  }
+
+  /**
+   * Adds the specified parameters to this builder. Used by the writers to building up {@link OffsetIndex} objects to be
+   * written to the Parquet file.
+   *
+   * @param compressedPageSize
+   *          the size of the page (including header)
+   * @param rowCount
+   *          the number of rows in the page
+   */
+  public void add(int compressedPageSize, long rowCount) {
+    add(previousOffset + previousPageSize, compressedPageSize, previousRowIndex + previousRowCount);
+    previousRowCount = rowCount;
+  }
+
+  /**
+   * Adds the specified parameters to this builder. Used by the metadata converter to building up {@link OffsetIndex}
+   * objects read from the Parquet file.
+   *
+   * @param offset
+   *          the offset of the page in the file
+   * @param compressedPageSize
+   *          the size of the page (including header)
+   * @param firstRowIndex
+   *          the index of the first row in the page (within the row group)
+   */
+  public void add(long offset, int compressedPageSize, long firstRowIndex) {
+    previousOffset = offset;
+    offsets.add(offset);
+    previousPageSize = compressedPageSize;
+    compressedPageSizes.add(compressedPageSize);
+    previousRowIndex = firstRowIndex;
+    firstRowIndexes.add(firstRowIndex);
+  }
+
+  /**
+   * Builds the offset index. Used by the metadata converter to building up {@link OffsetIndex}
+   * objects read from the Parquet file.
+   *
+   * @return the newly created offset index or {@code null} if the {@link OffsetIndex} object would be empty
+   */
+  public OffsetIndex build() {
+    return build(0);
+  }
+
+  /**
+   * Builds the offset index. Used by the writers to building up {@link OffsetIndex} objects to be
+   * written to the Parquet file.
+   *
+   * @param firstPageOffset
+   *          the actual offset in the file to be used to translate all the collected offsets
+   * @return the newly created offset index or {@code null} if the {@link OffsetIndex} object would be empty
+   */
+  public OffsetIndex build(long firstPageOffset) {
+    if (compressedPageSizes.isEmpty()) {
+      return null;
+    }
+    long[] offsets = this.offsets.toLongArray();
+    if (firstPageOffset != 0) {
+      for (int i = 0, n = offsets.length; i < n; ++i) {
+        offsets[i] += firstPageOffset;
+      }
+    }
+    OffsetIndexImpl offsetIndex = new OffsetIndexImpl();
+    offsetIndex.offsets = offsets;
+    offsetIndex.compressedPageSizes = compressedPageSizes.toIntArray();
+    offsetIndex.firstRowIndexes = firstRowIndexes.toLongArray();
+
+    return offsetIndex;
+  }
+
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index c855339c5..c28649eef 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -20,12 +20,10 @@
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.parquet.column.ParquetProperties;
-import org.junit.Test;
-
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnReader;
 import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.impl.ColumnReadStoreImpl;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.page.mem.MemPageStore;
@@ -33,6 +31,7 @@
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +46,7 @@ public void testMemColumn() throws Exception {
     ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
     ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
     columnWriter.write(42l, 0, 0);
+    memColumnsStore.endRecord();
     memColumnsStore.flush();
 
     ColumnReader columnReader = getColumnReader(memPageStore, path, schema);
@@ -85,6 +85,7 @@ public void testMemColumnBinary() throws Exception {
 
     ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
     columnWriter.write(Binary.fromString("42"), 0, 0);
+    memColumnsStore.endRecord();
     memColumnsStore.flush();
 
     ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
@@ -108,6 +109,7 @@ public void testMemColumnSeveralPages() throws Exception {
     ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
     for (int i = 0; i < 2000; i++) {
       columnWriter.write(42l, 0, 0);
+      memColumnsStore.endRecord();
     }
     memColumnsStore.flush();
 
@@ -141,6 +143,7 @@ public void testMemColumnSeveralPagesRepeated() throws Exception {
       } else {
         columnWriter.writeNull(r, d);
       }
+      memColumnsStore.endRecord();
     }
     memColumnsStore.flush();
 
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
index be3a0f9cb..706b00110 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
@@ -56,6 +56,12 @@ public void writePage(BytesInput bytesInput, int valueCount, Statistics statisti
     LOG.debug("page written for {} bytes and {} records", bytesInput.size(), valueCount);
   }
 
+  @Override
+  public void writePage(BytesInput bytesInput, int valueCount, int rowCount, Statistics<?> statistics,
+      Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
+    writePage(bytesInput, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding);
+  }
+
   @Override
   public void writePageV2(int rowCount, int nullCount, int valueCount,
       BytesInput repetitionLevels, BytesInput definitionLevels,
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
new file mode 100644
index 000000000..f1706a1e0
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
@@ -0,0 +1,949 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static java.util.Arrays.asList;
+import static org.apache.parquet.schema.OriginalType.DECIMAL;
+import static org.apache.parquet.schema.OriginalType.UINT_8;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.internal.column.columnindex.BinaryColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.BooleanColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.DoubleColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.FloatColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.IntColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.LongColumnIndexBuilder;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ColumnIndexBuilder}.
+ */
+public class TestColumnIndexBuilder {
+
+  @Test
+  public void testBuildBinaryDecimal() {
+    PrimitiveType type = Types.required(BINARY).as(DECIMAL).precision(12).scale(2).named("test_binary_decimal");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, decimalBinary("-0.17"), decimalBinary("1234567890.12")));
+    builder.add(stats(type, decimalBinary("-234.23"), null, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, decimalBinary("-9999293.23"), decimalBinary("2348978.45")));
+    builder.add(stats(type, null, null, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, decimalBinary("87656273")));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 0, 3, 3, 0, 4, 2, 0);
+    assertCorrectNullPages(columnIndex, true, false, false, true, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        decimalBinary("1234567890.12"),
+        decimalBinary("-234.23"),
+        null,
+        decimalBinary("2348978.45"),
+        null,
+        null,
+        decimalBinary("87656273"));
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        decimalBinary("-0.17"),
+        decimalBinary("-234.23"),
+        null,
+        decimalBinary("-9999293.23"),
+        null,
+        null,
+        decimalBinary("87656273"));
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null, null));
+    builder.add(stats(type, decimalBinary("-9999293.23"), decimalBinary("-234.23")));
+    builder.add(stats(type, decimalBinary("-0.17"), decimalBinary("87656273")));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, decimalBinary("87656273")));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, decimalBinary("1234567890.12"), null, null, null));
+    builder.add(stats(type, null, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 4, 0, 0, 2, 0, 2, 3, 3);
+    assertCorrectNullPages(columnIndex, true, false, false, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        decimalBinary("-234.23"),
+        decimalBinary("87656273"),
+        null,
+        decimalBinary("87656273"),
+        null,
+        decimalBinary("1234567890.12"),
+        null);
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        decimalBinary("-9999293.23"),
+        decimalBinary("-0.17"),
+        null,
+        decimalBinary("87656273"),
+        null,
+        decimalBinary("1234567890.12"),
+        null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, decimalBinary("1234567890.12"), null, null, null));
+    builder.add(stats(type, null, null, null, null));
+    builder.add(stats(type, decimalBinary("1234567890.12"), decimalBinary("87656273")));
+    builder.add(stats(type, decimalBinary("987656273"), decimalBinary("-0.17")));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, decimalBinary("-234.23"), decimalBinary("-9999293.23")));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 3, 2, 3, 4, 0, 0, 2, 0);
+    assertCorrectNullPages(columnIndex, true, true, false, true, false, false, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        null,
+        decimalBinary("1234567890.12"),
+        null,
+        decimalBinary("1234567890.12"),
+        decimalBinary("987656273"),
+        null,
+        decimalBinary("-234.23"));
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        null,
+        decimalBinary("1234567890.12"),
+        null,
+        decimalBinary("87656273"),
+        decimalBinary("-0.17"),
+        null,
+        decimalBinary("-9999293.23"));
+  }
+
+  @Test
+  public void testBuildBinaryUtf8() {
+    PrimitiveType type = Types.required(BINARY).as(UTF8).named("test_binary_utf8");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, stringBinary("Jeltz"), stringBinary("Slartibartfast"), null, null));
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, stringBinary("Beeblebrox"), stringBinary("Prefect")));
+    builder.add(stats(type, stringBinary("Dent"), stringBinary("Trilian"), null));
+    builder.add(stats(type, stringBinary("Beeblebrox")));
+    builder.add(stats(type, null, null));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 5, 2, 0, 1, 0, 2);
+    assertCorrectNullPages(columnIndex, true, false, true, true, false, false, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        stringBinary("Slartibartfast"),
+        null,
+        null,
+        stringBinary("Prefect"),
+        stringBinary("Trilian"),
+        stringBinary("Beeblebrox"),
+        null);
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        stringBinary("Jeltz"),
+        null,
+        null,
+        stringBinary("Beeblebrox"),
+        stringBinary("Dent"),
+        stringBinary("Beeblebrox"),
+        null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, stringBinary("Beeblebrox"), stringBinary("Dent"), null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, stringBinary("Dent"), stringBinary("Jeltz")));
+    builder.add(stats(type, stringBinary("Dent"), stringBinary("Prefect"), null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, stringBinary("Slartibartfast")));
+    builder.add(stats(type, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 5, 0, 1, 2, 0, 2);
+    assertCorrectNullPages(columnIndex, false, true, true, false, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        stringBinary("Dent"),
+        null,
+        null,
+        stringBinary("Jeltz"),
+        stringBinary("Prefect"),
+        null,
+        stringBinary("Slartibartfast"),
+        null);
+    assertCorrectValues(columnIndex.getMinValues(),
+        stringBinary("Beeblebrox"),
+        null,
+        null,
+        stringBinary("Dent"),
+        stringBinary("Dent"),
+        null,
+        stringBinary("Slartibartfast"),
+        null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, stringBinary("Slartibartfast")));
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, stringBinary("Prefect"), stringBinary("Jeltz"), null));
+    builder.add(stats(type, stringBinary("Dent"), stringBinary("Dent")));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, stringBinary("Dent"), stringBinary("Beeblebrox"), null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 0, 5, 1, 0, 2, 2, 2);
+    assertCorrectNullPages(columnIndex, true, false, true, false, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        stringBinary("Slartibartfast"),
+        null,
+        stringBinary("Prefect"),
+        stringBinary("Dent"),
+        null,
+        null,
+        stringBinary("Dent"));
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        stringBinary("Slartibartfast"),
+        null,
+        stringBinary("Jeltz"),
+        stringBinary("Dent"),
+        null,
+        null,
+        stringBinary("Beeblebrox"));
+  }
+
+  @Test
+  public void testStaticBuildBinary() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(BINARY).as(UTF8).named("test_binary_utf8"),
+        BoundaryOrder.ASCENDING,
+        asList(true, true, false, false, true, false, true, false),
+        asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l),
+        toBBList(
+            null,
+            null,
+            stringBinary("Beeblebrox"),
+            stringBinary("Dent"),
+            null,
+            stringBinary("Jeltz"),
+            null,
+            stringBinary("Slartibartfast")),
+        toBBList(
+            null,
+            null,
+            stringBinary("Dent"),
+            stringBinary("Dent"),
+            null,
+            stringBinary("Prefect"),
+            null,
+            stringBinary("Slartibartfast")));
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 1, 2, 3, 4, 5, 6, 7, 8);
+    assertCorrectNullPages(columnIndex, true, true, false, false, true, false, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(),
+        null,
+        null,
+        stringBinary("Dent"),
+        stringBinary("Dent"),
+        null,
+        stringBinary("Prefect"),
+        null,
+        stringBinary("Slartibartfast"));
+    assertCorrectValues(columnIndex.getMinValues(),
+        null,
+        null,
+        stringBinary("Beeblebrox"),
+        stringBinary("Dent"),
+        null,
+        stringBinary("Jeltz"),
+        null,
+        stringBinary("Slartibartfast"));
+  }
+
+  @Test
+  public void testBuildBoolean() {
+    PrimitiveType type = Types.required(BOOLEAN).named("test_boolean");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(BooleanColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, false, true));
+    builder.add(stats(type, true, false, null));
+    builder.add(stats(type, true, true, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, false, false));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 0);
+    assertCorrectNullPages(columnIndex, false, false, false, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), true, true, true, null, false);
+    assertCorrectValues(columnIndex.getMinValues(), false, false, true, null, false);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, false, false));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, null, null, null, null));
+    builder.add(stats(type, false, true, null));
+    builder.add(stats(type, false, true, null, null));
+    builder.add(stats(type, null, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 0, 3, 4, 1, 2, 3);
+    assertCorrectNullPages(columnIndex, true, false, true, true, false, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, false, null, null, true, true, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, false, null, null, false, false, null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, true, true));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, null, null, null, null));
+    builder.add(stats(type, true, false, null));
+    builder.add(stats(type, false, false, null, null));
+    builder.add(stats(type, null, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 0, 3, 4, 1, 2, 3);
+    assertCorrectNullPages(columnIndex, true, false, true, true, false, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, true, null, null, true, false, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, true, null, null, false, false, null);
+  }
+
+  @Test
+  public void testStaticBuildBoolean() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(BOOLEAN).named("test_boolean"),
+        BoundaryOrder.DESCENDING,
+        asList(false, true, false, true, false, true),
+        asList(9l, 8l, 7l, 6l, 5l, 0l),
+        toBBList(false, null, false, null, true, null),
+        toBBList(true, null, false, null, true, null));
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 9, 8, 7, 6, 5, 0);
+    assertCorrectNullPages(columnIndex, false, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), true, null, false, null, true, null);
+    assertCorrectValues(columnIndex.getMinValues(), false, null, false, null, true, null);
+  }
+
+  @Test
+  public void testBuildDouble() {
+    PrimitiveType type = Types.required(DOUBLE).named("test_double");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(DoubleColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, -4.2, -4.1));
+    builder.add(stats(type, -11.7, 7.0, null));
+    builder.add(stats(type, 2.2, 2.2, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 1.9, 2.32));
+    builder.add(stats(type, -21.0, 8.1));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 0, 0);
+    assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), -4.1, 7.0, 2.2, null, 2.32, 8.1);
+    assertCorrectValues(columnIndex.getMinValues(), -4.2, -11.7, 2.2, null, 1.9, -21.0);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -532.3, -345.2, null, null));
+    builder.add(stats(type, -234.7, -234.6, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, -234.6, 2.99999));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 3.0, 42.83));
+    builder.add(stats(type, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 1, 2, 3, 0, 2, 0, 2);
+    assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, -345.2, -234.6, null, null, 2.99999, null, 42.83, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, -532.3, -234.7, null, null, -234.6, null, 3.0, null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, 532.3, 345.2));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 234.7, 234.6, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 234.69, -2.99999));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -3.0, -42.83));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 5, 0, 3, 1, 2, 0, 2, 2, 0);
+    assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 532.3, null, 234.7, null, 234.69, null, null, -3.0);
+    assertCorrectValues(columnIndex.getMinValues(), null, 345.2, null, 234.6, null, -2.99999, null, null, -42.83);
+  }
+
+  @Test
+  public void testStaticBuildDouble() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(DOUBLE).named("test_double"),
+        BoundaryOrder.UNORDERED,
+        asList(false, false, false, false, false, false),
+        asList(0l, 1l, 2l, 3l, 4l, 5l),
+        toBBList(-1.0, -2.0, -3.0, -4.0, -5.0, -6.0),
+        toBBList(1.0, 2.0, 3.0, 4.0, 5.0, 6.0));
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 4, 5);
+    assertCorrectNullPages(columnIndex, false, false, false, false, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), 1.0, 2.0, 3.0, 4.0, 5.0, 6.0);
+    assertCorrectValues(columnIndex.getMinValues(), -1.0, -2.0, -3.0, -4.0, -5.0, -6.0);
+  }
+
+  @Test
+  public void testBuildFloat() {
+    PrimitiveType type = Types.required(FLOAT).named("test_float");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(FloatColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, -4.2f, -4.1f));
+    builder.add(stats(type, -11.7f, 7.0f, null));
+    builder.add(stats(type, 2.2f, 2.2f, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 1.9f, 2.32f));
+    builder.add(stats(type, -21.0f, 8.1f));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 0, 0);
+    assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), -4.1f, 7.0f, 2.2f, null, 2.32f, 8.1f);
+    assertCorrectValues(columnIndex.getMinValues(), -4.2f, -11.7f, 2.2f, null, 1.9f, -21.0f);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -532.3f, -345.2f, null, null));
+    builder.add(stats(type, -300.6f, -234.7f, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, -234.6f, 2.99999f));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 3.0f, 42.83f));
+    builder.add(stats(type, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 1, 2, 3, 0, 2, 0, 2);
+    assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, -345.2f, -234.7f, null, null, 2.99999f, null, 42.83f, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, -532.3f, -300.6f, null, null, -234.6f, null, 3.0f, null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, 532.3f, 345.2f));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 234.7f, 234.6f, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 234.6f, -2.99999f));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -3.0f, -42.83f));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 5, 0, 3, 1, 2, 0, 2, 2, 0);
+    assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 532.3f, null, 234.7f, null, 234.6f, null, null, -3.0f);
+    assertCorrectValues(columnIndex.getMinValues(), null, 345.2f, null, 234.6f, null, -2.99999f, null, null, -42.83f);
+  }
+
+  @Test
+  public void testStaticBuildFloat() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(FLOAT).named("test_float"),
+        BoundaryOrder.ASCENDING,
+        asList(true, true, true, false, false, false),
+        asList(9l, 8l, 7l, 6l, 0l, 0l),
+        toBBList(null, null, null, -3.0f, -2.0f, 0.1f),
+        toBBList(null, null, null, -2.0f, 0.0f, 6.0f));
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 9, 8, 7, 6, 0, 0);
+    assertCorrectNullPages(columnIndex, true, true, true, false, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, null, null, -2.0f, 0.0f, 6.0f);
+    assertCorrectValues(columnIndex.getMinValues(), null, null, null, -3.0f, -2.0f, 0.1f);
+  }
+
+  @Test
+  public void testBuildInt32() {
+    PrimitiveType type = Types.required(INT32).named("test_int32");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, -4, 10));
+    builder.add(stats(type, -11, 7, null));
+    builder.add(stats(type, 2, 2, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 1, 2));
+    builder.add(stats(type, -21, 8));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 0, 0);
+    assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), 10, 7, 2, null, 2, 8);
+    assertCorrectValues(columnIndex.getMinValues(), -4, -11, 2, null, 1, -21);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -532, -345, null, null));
+    builder.add(stats(type, -500, -42, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, -42, 2));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 3, 42));
+    builder.add(stats(type, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 1, 2, 3, 0, 2, 0, 2);
+    assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, -345, -42, null, null, 2, null, 42, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, -532, -500, null, null, -42, null, 3, null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, 532, 345));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 234, 42, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 42, -2));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -3, -42));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 5, 0, 3, 1, 2, 0, 2, 2, 0);
+    assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 532, null, 234, null, 42, null, null, -3);
+    assertCorrectValues(columnIndex.getMinValues(), null, 345, null, 42, null, -2, null, null, -42);
+  }
+
+  @Test
+  public void testStaticBuildInt32() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(INT32).named("test_int32"),
+        BoundaryOrder.DESCENDING,
+        asList(false, false, false, true, true, true),
+        asList(0l, 10l, 0l, 3l, 5l, 7l),
+        toBBList(10, 8, 6, null, null, null),
+        toBBList(9, 7, 5, null, null, null));
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 10, 0, 3, 5, 7);
+    assertCorrectNullPages(columnIndex, false, false, false, true, true, true);
+    assertCorrectValues(columnIndex.getMaxValues(), 9, 7, 5, null, null, null);
+    assertCorrectValues(columnIndex.getMinValues(), 10, 8, 6, null, null, null);
+  }
+
+  @Test
+  public void testBuildUInt8() {
+    PrimitiveType type = Types.required(INT32).as(UINT_8).named("test_uint8");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, 4, 10));
+    builder.add(stats(type, 11, 17, null));
+    builder.add(stats(type, 2, 2, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 1, 0xFF));
+    builder.add(stats(type, 0xEF, 0xFA));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0, 1, 2, 3, 0, 0);
+    assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), 10, 17, 2, null, 0xFF, 0xFA);
+    assertCorrectValues(columnIndex.getMinValues(), 4, 11, 2, null, 1, 0xEF);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 0, 0, null, null));
+    builder.add(stats(type, 0, 42, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 42, 0xEE));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 0xEF, 0xFF));
+    builder.add(stats(type, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 1, 2, 3, 0, 2, 0, 2);
+    assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 0, 42, null, null, 0xEE, null, 0xFF, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, 0, 0, null, null, 42, null, 0xEF, null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, 0xFF, 0xFF));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 0xEF, 0xEA, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 0xEE, 42));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 41, 0));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 5, 0, 3, 1, 2, 0, 2, 2, 0);
+    assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 0xFF, null, 0xEF, null, 0xEE, null, null, 41);
+    assertCorrectValues(columnIndex.getMinValues(), null, 0xFF, null, 0xEA, null, 42, null, null, 0);
+  }
+
+  @Test
+  public void testBuildInt64() {
+    PrimitiveType type = Types.required(INT64).named("test_int64");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    assertThat(builder, instanceOf(LongColumnIndexBuilder.class));
+    assertNull(builder.build());
+
+    builder.add(stats(type, -4l, 10l));
+    builder.add(stats(type, -11l, 7l, null));
+    builder.add(stats(type, 2l, 2l, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 1l, 2l));
+    builder.add(stats(type, -21l, 8l));
+    ColumnIndex columnIndex = builder.build();
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 0l, 1l, 2l, 3l, 0l, 0l);
+    assertCorrectNullPages(columnIndex, false, false, false, true, false, false);
+    assertCorrectValues(columnIndex.getMaxValues(), 10l, 7l, 2l, null, 2l, 8l);
+    assertCorrectValues(columnIndex.getMinValues(), -4l, -11l, 2l, null, 1l, -21l);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -532l, -345l, null, null));
+    builder.add(stats(type, -234l, -42l, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, -42l, 2l));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -3l, 42l));
+    builder.add(stats(type, null, null));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 2, 2, 1, 2, 3, 0, 2, 0, 2);
+    assertCorrectNullPages(columnIndex, true, false, false, true, true, false, true, false, true);
+    assertCorrectValues(columnIndex.getMaxValues(), null, -345l, -42l, null, null, 2l, null, 42l, null);
+    assertCorrectValues(columnIndex.getMinValues(), null, -532l, -234l, null, null, -42l, null, -3l, null);
+
+    builder = ColumnIndexBuilder.getBuilder(type);
+    builder.add(stats(type, null, null, null, null, null));
+    builder.add(stats(type, 532l, 345l));
+    builder.add(stats(type, null, null, null));
+    builder.add(stats(type, 234l, 42l, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, 42l, -2l));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, null, null));
+    builder.add(stats(type, -3l, -42l));
+    columnIndex = builder.build();
+    assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 5, 0, 3, 1, 2, 0, 2, 2, 0);
+    assertCorrectNullPages(columnIndex, true, false, true, false, true, false, true, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 532l, null, 234l, null, 42l, null, null, -3l);
+    assertCorrectValues(columnIndex.getMinValues(), null, 345l, null, 42l, null, -2l, null, null, -42l);
+  }
+
+  @Test
+  public void testStaticBuildInt64() {
+    ColumnIndex columnIndex = ColumnIndexBuilder.build(
+        Types.required(INT64).named("test_int64"),
+        BoundaryOrder.UNORDERED,
+        asList(true, false, true, false, true, false),
+        asList(1l, 2l, 3l, 4l, 5l, 6l),
+        toBBList(null, 2l, null, 4l, null, 9l),
+        toBBList(null, 3l, null, 15l, null, 10l));
+    assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+    assertCorrectNullCounts(columnIndex, 1, 2, 3, 4, 5, 6);
+    assertCorrectNullPages(columnIndex, true, false, true, false, true, false);
+    assertCorrectValues(columnIndex.getMaxValues(), null, 3l, null, 15l, null, 10l);
+    assertCorrectValues(columnIndex.getMinValues(), null, 2l, null, 4l, null, 9l);
+  }
+
+  @Test
+  public void testNoOpBuilder() {
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getNoOpBuilder();
+    builder.add(stats(Types.required(BINARY).as(UTF8).named("test_binary_utf8"), stringBinary("Jeltz"),
+        stringBinary("Slartibartfast"), null, null));
+    builder.add(stats(Types.required(BOOLEAN).named("test_boolean"), true, true, null, null));
+    builder.add(stats(Types.required(DOUBLE).named("test_double"), null, null, null));
+    builder.add(stats(Types.required(INT32).named("test_int32"), null, null));
+    builder.add(stats(Types.required(INT64).named("test_int64"), -234l, -42l, null));
+    assertNull(builder.build());
+  }
+
+  private static List<ByteBuffer> toBBList(Binary... values) {
+    List<ByteBuffer> buffers = new ArrayList<>(values.length);
+    for (Binary value : values) {
+      if (value == null) {
+        buffers.add(ByteBuffer.allocate(0));
+      } else {
+        buffers.add(value.toByteBuffer());
+      }
+    }
+    return buffers;
+  }
+
+  private static List<ByteBuffer> toBBList(Boolean... values) {
+    List<ByteBuffer> buffers = new ArrayList<>(values.length);
+    for (Boolean value : values) {
+      if (value == null) {
+        buffers.add(ByteBuffer.allocate(0));
+      } else {
+        buffers.add(ByteBuffer.wrap(BytesUtils.booleanToBytes(value)));
+      }
+    }
+    return buffers;
+  }
+
+  private static List<ByteBuffer> toBBList(Double... values) {
+    List<ByteBuffer> buffers = new ArrayList<>(values.length);
+    for (Double value : values) {
+      if (value == null) {
+        buffers.add(ByteBuffer.allocate(0));
+      } else {
+        buffers.add(ByteBuffer.wrap(BytesUtils.longToBytes(Double.doubleToLongBits(value))));
+      }
+    }
+    return buffers;
+  }
+
+  private static List<ByteBuffer> toBBList(Float... values) {
+    List<ByteBuffer> buffers = new ArrayList<>(values.length);
+    for (Float value : values) {
+      if (value == null) {
+        buffers.add(ByteBuffer.allocate(0));
+      } else {
+        buffers.add(ByteBuffer.wrap(BytesUtils.intToBytes(Float.floatToIntBits(value))));
+      }
+    }
+    return buffers;
+  }
+
+  private static List<ByteBuffer> toBBList(Integer... values) {
+    List<ByteBuffer> buffers = new ArrayList<>(values.length);
+    for (Integer value : values) {
+      if (value == null) {
+        buffers.add(ByteBuffer.allocate(0));
+      } else {
+        buffers.add(ByteBuffer.wrap(BytesUtils.intToBytes(value)));
+      }
+    }
+    return buffers;
+  }
+
+  private static List<ByteBuffer> toBBList(Long... values) {
+    List<ByteBuffer> buffers = new ArrayList<>(values.length);
+    for (Long value : values) {
+      if (value == null) {
+        buffers.add(ByteBuffer.allocate(0));
+      } else {
+        buffers.add(ByteBuffer.wrap(BytesUtils.longToBytes(value)));
+      }
+    }
+    return buffers;
+  }
+
+  private static Binary decimalBinary(String num) {
+    return Binary.fromConstantByteArray(new BigDecimal(num).unscaledValue().toByteArray());
+  }
+
+  private static Binary stringBinary(String str) {
+    return Binary.fromString(str);
+  }
+
+  private static void assertCorrectValues(List<ByteBuffer> values, Binary... expectedValues) {
+    assertEquals(expectedValues.length, values.size());
+    for (int i = 0; i < expectedValues.length; ++i) {
+      Binary expectedValue = expectedValues[i];
+      ByteBuffer value = values.get(i);
+      if (expectedValue == null) {
+        assertFalse("The byte buffer should be empty for null pages", value.hasRemaining());
+      } else {
+        assertArrayEquals("Invalid value for page " + i, expectedValue.getBytesUnsafe(), value.array());
+      }
+    }
+  }
+
+  private static void assertCorrectValues(List<ByteBuffer> values, Boolean... expectedValues) {
+    assertEquals(expectedValues.length, values.size());
+    for (int i = 0; i < expectedValues.length; ++i) {
+      Boolean expectedValue = expectedValues[i];
+      ByteBuffer value = values.get(i);
+      if (expectedValue == null) {
+        assertFalse("The byte buffer should be empty for null pages", value.hasRemaining());
+      } else {
+        assertEquals("The byte buffer should be 1 byte long for boolean", 1, value.remaining());
+        assertEquals("Invalid value for page " + i, expectedValue.booleanValue(), value.get(0) != 0);
+      }
+    }
+  }
+
+  private static void assertCorrectValues(List<ByteBuffer> values, Double... expectedValues) {
+    assertEquals(expectedValues.length, values.size());
+    for (int i = 0; i < expectedValues.length; ++i) {
+      Double expectedValue = expectedValues[i];
+      ByteBuffer value = values.get(i);
+      if (expectedValue == null) {
+        assertFalse("The byte buffer should be empty for null pages", value.hasRemaining());
+      } else {
+        assertEquals("The byte buffer should be 8 bytes long for double", 8, value.remaining());
+        assertEquals("Invalid value for page " + i, expectedValue.doubleValue(), value.getDouble(0), 0.0);
+      }
+    }
+  }
+
+  private static void assertCorrectValues(List<ByteBuffer> values, Float... expectedValues) {
+    assertEquals(expectedValues.length, values.size());
+    for (int i = 0; i < expectedValues.length; ++i) {
+      Float expectedValue = expectedValues[i];
+      ByteBuffer value = values.get(i);
+      if (expectedValue == null) {
+        assertFalse("The byte buffer should be empty for null pages", value.hasRemaining());
+      } else {
+        assertEquals("The byte buffer should be 4 bytes long for double", 4, value.remaining());
+        assertEquals("Invalid value for page " + i, expectedValue.floatValue(), value.getFloat(0), 0.0f);
+      }
+    }
+  }
+
+  private static void assertCorrectValues(List<ByteBuffer> values, Integer... expectedValues) {
+    assertEquals(expectedValues.length, values.size());
+    for (int i = 0; i < expectedValues.length; ++i) {
+      Integer expectedValue = expectedValues[i];
+      ByteBuffer value = values.get(i);
+      if (expectedValue == null) {
+        assertFalse("The byte buffer should be empty for null pages", value.hasRemaining());
+      } else {
+        assertEquals("The byte buffer should be 4 bytes long for int32", 4, value.remaining());
+        assertEquals("Invalid value for page " + i, expectedValue.intValue(), value.getInt(0));
+      }
+    }
+  }
+
+  private static void assertCorrectValues(List<ByteBuffer> values, Long... expectedValues) {
+    assertEquals(expectedValues.length, values.size());
+    for (int i = 0; i < expectedValues.length; ++i) {
+      Long expectedValue = expectedValues[i];
+      ByteBuffer value = values.get(i);
+      if (expectedValue == null) {
+        assertFalse("The byte buffer should be empty for null pages", value.hasRemaining());
+      } else {
+        assertEquals("The byte buffer should be 8 bytes long for int64", 8, value.remaining());
+        assertEquals("Invalid value for page " + i, expectedValue.intValue(), value.getLong(0));
+      }
+    }
+  }
+
+  private static void assertCorrectNullCounts(ColumnIndex columnIndex, long... expectedNullCounts) {
+    List<Long> nullCounts = columnIndex.getNullCounts();
+    assertEquals(expectedNullCounts.length, nullCounts.size());
+    for (int i = 0; i < expectedNullCounts.length; ++i) {
+      assertEquals("Invalid null count at page " + i, expectedNullCounts[i], nullCounts.get(i).longValue());
+    }
+  }
+
+  private static void assertCorrectNullPages(ColumnIndex columnIndex, boolean... expectedNullPages) {
+    List<Boolean> nullPages = columnIndex.getNullPages();
+    assertEquals(expectedNullPages.length, nullPages.size());
+    for (int i = 0; i < expectedNullPages.length; ++i) {
+      assertEquals("Invalid null pages at page " + i, expectedNullPages[i], nullPages.get(i).booleanValue());
+    }
+  }
+
+  private static Statistics<?> stats(PrimitiveType type, Object... values) {
+    Statistics<?> stats = Statistics.createStats(type);
+    for (Object value : values) {
+      if (value == null) {
+        stats.incrementNumNulls();
+        continue;
+      }
+      switch (type.getPrimitiveTypeName()) {
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+        case INT96:
+          stats.updateStats((Binary) value);
+          break;
+        case BOOLEAN:
+          stats.updateStats((boolean) value);
+          break;
+        case DOUBLE:
+          stats.updateStats((double) value);
+          break;
+        case FLOAT:
+          stats.updateStats((float) value);
+          break;
+        case INT32:
+          stats.updateStats((int) value);
+          break;
+        case INT64:
+          stats.updateStats((long) value);
+          break;
+        default:
+          fail("Unsupported value type for stats: " + value.getClass());
+      }
+    }
+    return stats;
+  }
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java
new file mode 100644
index 000000000..6207084ba
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestOffsetIndexBuilder.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.junit.Test;
+
+/**
+ * Tests for {@link OffsetIndexBuilder}.
+ */
+public class TestOffsetIndexBuilder {
+  @Test
+  public void testBuilderWithSizeAndRowCount() {
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder();
+    assertNull(builder.build());
+    assertNull(builder.build(1234));
+
+    builder.add(1000, 10);
+    builder.add(2000, 19);
+    builder.add(3000, 27);
+    builder.add(1200, 9);
+    assertCorrectValues(builder.build(),
+        0, 1000, 0,
+        1000, 2000, 10,
+        3000, 3000, 29,
+        6000, 1200, 56);
+    assertCorrectValues(builder.build(10000),
+        10000, 1000, 0,
+        11000, 2000, 10,
+        13000, 3000, 29,
+        16000, 1200, 56);
+  }
+
+  @Test
+  public void testNoOpBuilderWithSizeAndRowCount() {
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getNoOpBuilder();
+    builder.add(1, 2);
+    builder.add(3, 4);
+    builder.add(5, 6);
+    builder.add(7, 8);
+    assertNull(builder.build());
+    assertNull(builder.build(1000));
+  }
+
+  @Test
+  public void testBuilderWithOffsetSizeIndex() {
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder();
+    assertNull(builder.build());
+    assertNull(builder.build(1234));
+
+    builder.add(1000, 10000, 0);
+    builder.add(22000, 12000, 100);
+    builder.add(48000, 22000, 211);
+    builder.add(90000, 30000, 361);
+    assertCorrectValues(builder.build(),
+        1000, 10000, 0,
+        22000, 12000, 100,
+        48000, 22000, 211,
+        90000, 30000, 361);
+    assertCorrectValues(builder.build(100000),
+        101000, 10000, 0,
+        122000, 12000, 100,
+        148000, 22000, 211,
+        190000, 30000, 361);
+  }
+
+  @Test
+  public void testNoOpBuilderWithOffsetSizeIndex() {
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getNoOpBuilder();
+    builder.add(1, 2, 3);
+    builder.add(4, 5, 6);
+    builder.add(7, 8, 9);
+    builder.add(10, 11, 12);
+    assertNull(builder.build());
+    assertNull(builder.build(1000));
+  }
+
+  private void assertCorrectValues(OffsetIndex offsetIndex, long... offset_size_rowIndex_triplets) {
+    assertEquals(offset_size_rowIndex_triplets.length % 3, 0);
+    int pageCount = offset_size_rowIndex_triplets.length / 3;
+    assertEquals("Invalid pageCount", pageCount, offsetIndex.getPageCount());
+    for (int i = 0; i < pageCount; ++i) {
+      assertEquals("Invalid offsetIndex at page " + i, offset_size_rowIndex_triplets[3 * i],
+          offsetIndex.getOffset(i));
+      assertEquals("Invalid compressedPageSize at page " + i, offset_size_rowIndex_triplets[3 * i + 1],
+          offsetIndex.getCompressedPageSize(i));
+      assertEquals("Invalid firstRowIndex at page " + i, offset_size_rowIndex_triplets[3 * i + 2],
+          offsetIndex.getFirstRowIndex(i));
+    }
+  }
+}
diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml
index e7b2446a6..1e8124d54 100644
--- a/parquet-common/pom.xml
+++ b/parquet-common/pom.xml
@@ -61,6 +61,12 @@
       <version>${slf4j.version}</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.yetus</groupId>
+      <artifactId>audience-annotations</artifactId>
+      <version>0.7.0</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 555b8565a..6fce6f205 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -42,7 +42,9 @@
 import org.apache.parquet.format.CompressionCodec;
 import org.apache.parquet.format.PageEncodingStats;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.format.BoundaryOrder;
 import org.apache.parquet.format.ColumnChunk;
+import org.apache.parquet.format.ColumnIndex;
 import org.apache.parquet.format.ColumnMetaData;
 import org.apache.parquet.format.ColumnOrder;
 import org.apache.parquet.format.ConvertedType;
@@ -53,7 +55,9 @@
 import org.apache.parquet.format.FieldRepetitionType;
 import org.apache.parquet.format.FileMetaData;
 import org.apache.parquet.format.KeyValue;
+import org.apache.parquet.format.OffsetIndex;
 import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageLocation;
 import org.apache.parquet.format.PageType;
 import org.apache.parquet.format.RowGroup;
 import org.apache.parquet.format.SchemaElement;
@@ -65,6 +69,9 @@
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.apache.parquet.internal.hadoop.metadata.IndexReference;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
 import org.apache.parquet.schema.GroupType;
@@ -247,6 +254,17 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGrou
 //      columnChunk.meta_data.index_page_offset = ;
 //      columnChunk.meta_data.key_value_metadata = ; // nothing yet
 
+      IndexReference columnIndexRef = columnMetaData.getColumnIndexReference();
+      if (columnIndexRef != null) {
+        columnChunk.setColumn_index_offset(columnIndexRef.getOffset());
+        columnChunk.setColumn_index_length(columnIndexRef.getLength());
+      }
+      IndexReference offsetIndexRef = columnMetaData.getOffsetIndexReference();
+      if (offsetIndexRef != null) {
+        columnChunk.setOffset_index_offset(offsetIndexRef.getOffset());
+        columnChunk.setOffset_index_length(offsetIndexRef.getLength());
+      }
+
       parquetColumns.add(columnChunk);
     }
     RowGroup rowGroup = new RowGroup(parquetColumns, block.getTotalByteSize(), block.getRowCount());
@@ -920,6 +938,8 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws
               metaData.num_values,
               metaData.total_compressed_size,
               metaData.total_uncompressed_size);
+          column.setColumnIndexReference(toColumnIndexReference(columnChunk));
+          column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
           // TODO
           // index_page_offset
           // key_value_metadata
@@ -941,6 +961,20 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws
         blocks);
   }
 
+  private static IndexReference toColumnIndexReference(ColumnChunk columnChunk) {
+    if (columnChunk.isSetColumn_index_offset() && columnChunk.isSetColumn_index_length()) {
+      return new IndexReference(columnChunk.getColumn_index_offset(), columnChunk.getColumn_index_length());
+    }
+    return null;
+  }
+
+  private static IndexReference toOffsetIndexReference(ColumnChunk columnChunk) {
+    if (columnChunk.isSetOffset_index_offset() && columnChunk.isSetOffset_index_length()) {
+      return new IndexReference(columnChunk.getOffset_index_offset(), columnChunk.getOffset_index_length());
+    }
+    return null;
+  }
+
   private static ColumnPath getPath(ColumnMetaData metaData) {
     String[] path = metaData.path_in_schema.toArray(new String[metaData.path_in_schema.size()]);
     return ColumnPath.get(path);
@@ -1125,4 +1159,78 @@ public void writeDictionaryPageHeader(
     writePageHeader(pageHeader, to);
   }
 
+  private static BoundaryOrder toParquetBoundaryOrder(
+      org.apache.parquet.internal.column.columnindex.BoundaryOrder boundaryOrder) {
+    switch (boundaryOrder) {
+      case ASCENDING:
+        return BoundaryOrder.ASCENDING;
+      case DESCENDING:
+        return BoundaryOrder.DESCENDING;
+      case UNORDERED:
+        return BoundaryOrder.UNORDERED;
+      default:
+        throw new IllegalArgumentException("Unsupported boundary order: " + boundaryOrder);
+    }
+  }
+
+  private static org.apache.parquet.internal.column.columnindex.BoundaryOrder fromParquetBoundaryOrder(
+      BoundaryOrder boundaryOrder) {
+    switch (boundaryOrder) {
+      case ASCENDING:
+        return org.apache.parquet.internal.column.columnindex.BoundaryOrder.ASCENDING;
+      case DESCENDING:
+        return org.apache.parquet.internal.column.columnindex.BoundaryOrder.DESCENDING;
+      case UNORDERED:
+        return org.apache.parquet.internal.column.columnindex.BoundaryOrder.UNORDERED;
+      default:
+        throw new IllegalArgumentException("Unsupported boundary order: " + boundaryOrder);
+    }
+  }
+
+  public static ColumnIndex toParquetColumnIndex(PrimitiveType type,
+      org.apache.parquet.internal.column.columnindex.ColumnIndex columnIndex) {
+    if (!isMinMaxStatsSupported(type) || columnIndex == null) {
+      return null;
+    }
+    ColumnIndex parquetColumnIndex = new ColumnIndex(
+        columnIndex.getNullPages(),
+        columnIndex.getMinValues(),
+        columnIndex.getMaxValues(),
+        toParquetBoundaryOrder(columnIndex.getBoundaryOrder()));
+    parquetColumnIndex.setNull_counts(columnIndex.getNullCounts());
+    return parquetColumnIndex;
+  }
+
+  public static org.apache.parquet.internal.column.columnindex.ColumnIndex fromParquetColumnIndex(PrimitiveType type,
+      ColumnIndex parquetColumnIndex) {
+    if (!isMinMaxStatsSupported(type)) {
+      return null;
+    }
+    return ColumnIndexBuilder.build(type,
+        fromParquetBoundaryOrder(parquetColumnIndex.getBoundary_order()),
+        parquetColumnIndex.getNull_pages(),
+        parquetColumnIndex.getNull_counts(),
+        parquetColumnIndex.getMin_values(),
+        parquetColumnIndex.getMax_values());
+  }
+
+  public static OffsetIndex toParquetOffsetIndex(org.apache.parquet.internal.column.columnindex.OffsetIndex offsetIndex) {
+    List<PageLocation> pageLocations = new ArrayList<>(offsetIndex.getPageCount());
+    for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) {
+      pageLocations.add(new PageLocation(
+          offsetIndex.getOffset(i),
+          offsetIndex.getCompressedPageSize(i),
+          offsetIndex.getFirstRowIndex(i)));
+    }
+    return new OffsetIndex(pageLocations);
+  }
+
+  public static org.apache.parquet.internal.column.columnindex.OffsetIndex fromParquetOffsetIndex(
+      OffsetIndex parquetOffsetIndex) {
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder();
+    for (PageLocation pageLocation : parquetOffsetIndex.getPage_locations()) {
+      builder.add(pageLocation.getOffset(), pageLocation.getCompressed_page_size(), pageLocation.getFirst_row_index());
+    }
+    return builder.build();
+  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 82c288fe4..064649334 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -37,6 +37,8 @@
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.bytes.ByteBufferAllocator;
@@ -67,6 +69,8 @@
     private Set<Encoding> dlEncodings = new HashSet<Encoding>();
     private List<Encoding> dataEncodings = new ArrayList<Encoding>();
 
+    private ColumnIndexBuilder columnIndexBuilder;
+    private OffsetIndexBuilder offsetIndexBuilder;
     private Statistics totalStatistics;
     private final ByteBufferAllocator allocator;
 
@@ -77,11 +81,25 @@ private ColumnChunkPageWriter(ColumnDescriptor path,
       this.compressor = compressor;
       this.allocator = allocator;
       this.buf = new ConcatenatingByteArrayCollector();
+      this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType());
+      this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+    }
+
+    @Override
+    @Deprecated
+    public void writePage(BytesInput bytesInput, int valueCount, Statistics<?> statistics, Encoding rlEncoding,
+        Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
+      // Setting the builders to the no-op ones so no column/offset indexes will be written for this column chunk
+      columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+      offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
+
+      writePage(bytesInput, valueCount, -1, statistics, rlEncoding, dlEncoding, valuesEncoding);
     }
 
     @Override
     public void writePage(BytesInput bytes,
                           int valueCount,
+                          int rowCount,
                           Statistics statistics,
                           Encoding rlEncoding,
                           Encoding dlEncoding,
@@ -121,6 +139,9 @@ public void writePage(BytesInput bytes,
         totalStatistics.mergeStatistics(statistics);
       }
 
+      columnIndexBuilder.add(statistics);
+      offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount);
+
       // by concatenating before collecting instead of collecting twice,
       // we only allocate one buffer to copy into instead of multiple.
       buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
@@ -166,6 +187,9 @@ public void writePageV2(
         totalStatistics.mergeStatistics(statistics);
       }
 
+      columnIndexBuilder.add(statistics);
+      offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() + compressedSize), rowCount);
+
       // by concatenating before collecting instead of collecting twice,
       // we only allocate one buffer to copy into instead of multiple.
       buf.collect(
@@ -193,14 +217,20 @@ public long getMemSize() {
     }
 
     public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
-      writer.startColumn(path, totalValueCount, compressor.getCodecName());
-      if (dictionaryPage != null) {
-        writer.writeDictionaryPage(dictionaryPage);
-        // tracking the dictionary encoding is handled in writeDictionaryPage
-      }
-      writer.writeDataPages(buf, uncompressedLength, compressedLength, totalStatistics,
-          rlEncodings, dlEncodings, dataEncodings);
-      writer.endColumn();
+      writer.writeColumnChunk(
+          path,
+          totalValueCount,
+          compressor.getCodecName(),
+          dictionaryPage,
+          buf,
+          uncompressedLength,
+          compressedLength,
+          totalStatistics,
+          columnIndexBuilder,
+          offsetIndexBuilder,
+          rlEncodings,
+          dlEncodings,
+          dataEncodings);
       if (LOG.isDebugEnabled()) {
         LOG.debug(
             String.format(
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 95aaec423..eda4745b4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -85,10 +85,14 @@
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
 import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.hadoop.metadata.IndexReference;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
+import org.apache.yetus.audience.InterfaceAudience.Private;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -903,6 +907,40 @@ private DictionaryPage readCompressedDictionary(
         converter.getEncoding(dictHeader.getEncoding()));
   }
 
+  /**
+   * @param column
+   *          the column chunk which the column index is to be returned for
+   * @return the column index for the specified column chunk or {@code null} if there is no index
+   * @throws IOException
+   *           if any I/O error occurs during reading the file
+   */
+  @Private
+  public ColumnIndex readColumnIndex(ColumnChunkMetaData column) throws IOException {
+    IndexReference ref = column.getColumnIndexReference();
+    if (ref == null) {
+      return null;
+    }
+    f.seek(ref.getOffset());
+    return ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(), Util.readColumnIndex(f));
+  }
+
+  /**
+   * @param column
+   *          the column chunk which the offset index is to be returned for
+   * @return the offset index for the specified column chunk or {@code null} if there is no index
+   * @throws IOException
+   *           if any I/O error occurs during reading the file
+   */
+  @Private
+  public OffsetIndex readOffsetIndex(ColumnChunkMetaData column) throws IOException {
+    IndexReference ref = column.getOffsetIndexReference();
+    if (ref == null) {
+      return null;
+    }
+    f.seek(ref.getOffset());
+    return ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(f));
+  }
+
   @Override
   public void close() throws IOException {
     try {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index c98c24796..bd0f6835e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -50,6 +50,7 @@
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.format.Util;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -59,6 +60,11 @@
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.apache.parquet.internal.hadoop.metadata.IndexReference;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.io.OutputFile;
 import org.apache.parquet.io.SeekableInputStream;
@@ -97,9 +103,17 @@
   // file data
   private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
 
+  // The column/offset indexes per blocks per column chunks
+  private final List<List<ColumnIndex>> columnIndexes = new ArrayList<>();
+  private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
+
   // row group data
   private BlockMetaData currentBlock; // appended to by endColumn
 
+  // The column/offset indexes for the actual block
+  private List<ColumnIndex> currentColumnIndexes;
+  private List<OffsetIndex> currentOffsetIndexes;
+
   // row group data set at the start of a row group
   private long currentRecordCount; // set in startBlock
 
@@ -109,6 +123,9 @@
   private long uncompressedLength;
   private long compressedLength;
   private Statistics currentStatistics; // accumulated in writePage(s)
+  private ColumnIndexBuilder columnIndexBuilder;
+  private OffsetIndexBuilder offsetIndexBuilder;
+  private long firstPageOffset;
 
   // column chunk data set at the start of a column
   private CompressionCodecName currentChunkCodec; // set in startColumn
@@ -296,6 +313,9 @@ public void startBlock(long recordCount) throws IOException {
 
     currentBlock = new BlockMetaData();
     currentRecordCount = recordCount;
+
+    currentColumnIndexes = new ArrayList<>();
+    currentOffsetIndexes = new ArrayList<>();
   }
 
   /**
@@ -320,6 +340,10 @@ public void startColumn(ColumnDescriptor descriptor,
     uncompressedLength = 0;
     // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one
     currentStatistics = null;
+
+    columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType);
+    offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+    firstPageOffset = -1;
   }
 
   /**
@@ -367,6 +391,9 @@ public void writeDataPage(
       Encoding dlEncoding,
       Encoding valuesEncoding) throws IOException {
     state = state.write();
+    // We are unable to build indexes without rowCount so skip them for this column
+    offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
+    columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
     long beforeHeader = out.getPos();
     LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
     int compressedPageSize = (int)bytes.size();
@@ -398,8 +425,50 @@ public void writeDataPage(
    * @param dlEncoding encoding of the definition level
    * @param valuesEncoding encoding of values
    * @throws IOException if there is an error while writing
+   * @deprecated this method does not support writing column indexes; Use
+   *             {@link #writeDataPage(int, int, BytesInput, Statistics, long, Encoding, Encoding, Encoding)} instead
+   */
+  @Deprecated
+  public void writeDataPage(
+      int valueCount, int uncompressedPageSize,
+      BytesInput bytes,
+      Statistics statistics,
+      Encoding rlEncoding,
+      Encoding dlEncoding,
+      Encoding valuesEncoding) throws IOException {
+    // We are unable to build indexes without rowCount so skip them for this column
+    offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
+    columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+    innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
+  }
+
+  /**
+   * Writes a single page
+   * @param valueCount count of values
+   * @param uncompressedPageSize the size of the data once uncompressed
+   * @param bytes the compressed data for the page without header
+   * @param statistics the statistics of the page
+   * @param rowCount the number of rows in the page
+   * @param rlEncoding encoding of the repetition level
+   * @param dlEncoding encoding of the definition level
+   * @param valuesEncoding encoding of values
+   * @throws IOException if any I/O error occurs during writing the file
    */
   public void writeDataPage(
+      int valueCount, int uncompressedPageSize,
+      BytesInput bytes,
+      Statistics statistics,
+      long rowCount,
+      Encoding rlEncoding,
+      Encoding dlEncoding,
+      Encoding valuesEncoding) throws IOException {
+    long beforeHeader = out.getPos();
+    innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
+
+    offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount);
+  }
+
+  private void innerWriteDataPage(
       int valueCount, int uncompressedPageSize,
       BytesInput bytes,
       Statistics statistics,
@@ -408,8 +477,11 @@ public void writeDataPage(
       Encoding valuesEncoding) throws IOException {
     state = state.write();
     long beforeHeader = out.getPos();
+    if (firstPageOffset == -1) {
+      firstPageOffset = beforeHeader;
+    }
     LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
-    int compressedPageSize = (int)bytes.size();
+    int compressedPageSize = (int) bytes.size();
     metadataConverter.writeDataPageHeader(
         uncompressedPageSize, compressedPageSize,
         valueCount,
@@ -431,6 +503,8 @@ public void writeDataPage(
       currentStatistics.mergeStatistics(statistics);
     }
 
+    columnIndexBuilder.add(statistics);
+
     encodingStatsBuilder.addDataEncoding(valuesEncoding);
     currentEncodings.add(rlEncoding);
     currentEncodings.add(dlEncoding);
@@ -438,25 +512,47 @@ public void writeDataPage(
   }
 
   /**
-   * writes a number of pages at once
-   * @param bytes bytes to be written including page headers
+   * Writes a column chunk at once
+   * @param descriptor the descriptor of the column
+   * @param valueCount the value count in this column
+   * @param compressionCodecName the name of the compression codec used for compressing the pages
+   * @param dictionaryPage the dictionary page for this column chunk (might be null)
+   * @param bytes the encoded pages including page headers to be written as is
    * @param uncompressedTotalPageSize total uncompressed size (without page headers)
    * @param compressedTotalPageSize total compressed size (without page headers)
+   * @param totalStats accumulated statistics for the column chunk
+   * @param columnIndexBuilder the builder object for the column index
+   * @param offsetIndexBuilder the builder object for the offset index
+   * @param rlEncodings the RL encodings used in this column chunk
+   * @param dlEncodings the DL encodings used in this column chunk
+   * @param dataEncodings the data encodings used in this column chunk
    * @throws IOException if there is an error while writing
    */
-  void writeDataPages(BytesInput bytes,
-                      long uncompressedTotalPageSize,
-                      long compressedTotalPageSize,
-                      Statistics totalStats,
-                      Set<Encoding> rlEncodings,
-                      Set<Encoding> dlEncodings,
-                      List<Encoding> dataEncodings) throws IOException {
+  void writeColumnChunk(ColumnDescriptor descriptor,
+      long valueCount,
+      CompressionCodecName compressionCodecName,
+      DictionaryPage dictionaryPage,
+      BytesInput bytes,
+      long uncompressedTotalPageSize,
+      long compressedTotalPageSize,
+      Statistics<?> totalStats,
+      ColumnIndexBuilder columnIndexBuilder,
+      OffsetIndexBuilder offsetIndexBuilder,
+      Set<Encoding> rlEncodings,
+      Set<Encoding> dlEncodings,
+      List<Encoding> dataEncodings) throws IOException {
+    startColumn(descriptor, valueCount, compressionCodecName);
+
     state = state.write();
+    if (dictionaryPage != null) {
+      writeDictionaryPage(dictionaryPage);
+    }
     LOG.debug("{}: write data pages", out.getPos());
     long headersSize = bytes.size() - compressedTotalPageSize;
     this.uncompressedLength += uncompressedTotalPageSize + headersSize;
     this.compressedLength += compressedTotalPageSize + headersSize;
     LOG.debug("{}: write data pages content", out.getPos());
+    firstPageOffset = out.getPos();
     bytes.writeAllTo(out);
     encodingStatsBuilder.addDataEncodings(dataEncodings);
     if (rlEncodings.isEmpty()) {
@@ -466,6 +562,11 @@ void writeDataPages(BytesInput bytes,
     currentEncodings.addAll(dlEncodings);
     currentEncodings.addAll(dataEncodings);
     currentStatistics = totalStats;
+
+    this.columnIndexBuilder = columnIndexBuilder;
+    this.offsetIndexBuilder = offsetIndexBuilder;
+
+    endColumn();
   }
 
   /**
@@ -475,6 +576,8 @@ void writeDataPages(BytesInput bytes,
   public void endColumn() throws IOException {
     state = state.endColumn();
     LOG.debug("{}: end column", out.getPos());
+    currentColumnIndexes.add(columnIndexBuilder.build());
+    currentOffsetIndexes.add(offsetIndexBuilder.build(firstPageOffset));
     currentBlock.addColumn(ColumnChunkMetaData.get(
         currentChunkPath,
         currentChunkType,
@@ -490,6 +593,8 @@ public void endColumn() throws IOException {
     this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
     this.uncompressedLength = 0;
     this.compressedLength = 0;
+    columnIndexBuilder = null;
+    offsetIndexBuilder = null;
   }
 
   /**
@@ -501,6 +606,10 @@ public void endBlock() throws IOException {
     LOG.debug("{}: end block", out.getPos());
     currentBlock.setRowCount(currentRecordCount);
     blocks.add(currentBlock);
+    columnIndexes.add(currentColumnIndexes);
+    offsetIndexes.add(currentOffsetIndexes);
+    currentColumnIndexes = null;
+    currentOffsetIndexes = null;
     currentBlock = null;
   }
 
@@ -613,6 +722,11 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
         length = 0;
       }
 
+      // TODO: column/offset indexes are not copied
+      // (it would require seeking to the end of the file for each row groups)
+      currentColumnIndexes.add(null);
+      currentOffsetIndexes.add(null);
+
       currentBlock.addColumn(ColumnChunkMetaData.get(
           chunk.getPath(),
           chunk.getPrimitiveType(),
@@ -679,12 +793,57 @@ private static void copy(SeekableInputStream from, PositionOutputStream to,
    */
   public void end(Map<String, String> extraMetaData) throws IOException {
     state = state.end();
+    serializeColumnIndexes(columnIndexes, blocks, out);
+    serializeOffsetIndexes(offsetIndexes, blocks, out);
     LOG.debug("{}: end", out.getPos());
     this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
     serializeFooter(footer, out);
     out.close();
   }
 
+  private static void serializeColumnIndexes(
+      List<List<ColumnIndex>> columnIndexes,
+      List<BlockMetaData> blocks,
+      PositionOutputStream out) throws IOException {
+    LOG.debug("{}: column indexes", out.getPos());
+    for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+      List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+      List<ColumnIndex> blockColumnIndexes = columnIndexes.get(bIndex);
+      for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+        ColumnChunkMetaData column = columns.get(cIndex);
+        org.apache.parquet.format.ColumnIndex columnIndex = ParquetMetadataConverter
+            .toParquetColumnIndex(column.getPrimitiveType(), blockColumnIndexes.get(cIndex));
+        if (columnIndex == null) {
+          continue;
+        }
+        long offset = out.getPos();
+        Util.writeColumnIndex(columnIndex, out);
+        column.setColumnIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
+      }
+    }
+  }
+
+  private static void serializeOffsetIndexes(
+      List<List<OffsetIndex>> offsetIndexes,
+      List<BlockMetaData> blocks,
+      PositionOutputStream out) throws IOException {
+    LOG.debug("{}: offset indexes", out.getPos());
+    for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+      List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+      List<OffsetIndex> blockOffsetIndexes = offsetIndexes.get(bIndex);
+      for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+        OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex);
+        if (offsetIndex == null) {
+          continue;
+        }
+        ColumnChunkMetaData column = columns.get(cIndex);
+        long offset = out.getPos();
+        Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), out);
+        column.setOffsetIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
+      }
+    }
+  }
+
   private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
     long footerIndex = out.getPos();
     org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index fb94247ed..5f474fdbc 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -24,9 +24,11 @@
 import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.column.statistics.BooleanStatistics;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.internal.hadoop.metadata.IndexReference;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Types;
+import org.apache.yetus.audience.InterfaceAudience.Private;
 
 /**
  * Column meta data for a block stored in the file footer and passed in the InputSplit
@@ -168,6 +170,9 @@ protected static boolean positiveLongFitsInAnInt(long value) {
   // we save 3 references by storing together the column properties that have few distinct values
   private final ColumnChunkProperties properties;
 
+  private IndexReference columnIndexReference;
+  private IndexReference offsetIndexReference;
+
   protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
     this(null, columnChunkProperties);
   }
@@ -237,6 +242,40 @@ public PrimitiveType getPrimitiveType() {
    */
   abstract public Statistics getStatistics();
 
+  /**
+   * @return the reference to the column index
+   */
+  @Private
+  public IndexReference getColumnIndexReference() {
+    return columnIndexReference;
+  }
+
+  /**
+   * @param indexReference
+   *          the reference to the column index
+   */
+  @Private
+  public void setColumnIndexReference(IndexReference indexReference) {
+    this.columnIndexReference = indexReference;
+  }
+
+  /**
+   * @return the reference to the offset index
+   */
+  @Private
+  public IndexReference getOffsetIndexReference() {
+    return offsetIndexReference;
+  }
+
+  /**
+   * @param offsetIndexReference
+   *          the reference to the offset index
+   */
+  @Private
+  public void setOffsetIndexReference(IndexReference offsetIndexReference) {
+    this.offsetIndexReference = offsetIndexReference;
+  }
+
   /**
    * @return all the encodings used in this column
    */
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/internal/hadoop/metadata/IndexReference.java b/parquet-hadoop/src/main/java/org/apache/parquet/internal/hadoop/metadata/IndexReference.java
new file mode 100644
index 000000000..5e02f1efe
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/internal/hadoop/metadata/IndexReference.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.hadoop.metadata;
+
+/**
+ * Reference to an index (OffsetIndex and ColumnIndex) for a row-group containing the offset and length values so the
+ * reader can read the referenced data.
+ */
+public class IndexReference {
+  private final long offset;
+  private final int length;
+
+  public IndexReference(long offset, int length) {
+    this.offset = offset;
+    this.length = length;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public int getLength() {
+    return length;
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index 6cce32ff9..b9382fc0b 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -23,6 +23,7 @@
 import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -65,6 +66,11 @@
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveType;
 import org.junit.Assert;
@@ -892,4 +898,60 @@ public void testColumnOrders() throws IOException {
     assertEquals(ColumnOrder.undefined(), columns.get(1).getPrimitiveType().columnOrder());
     assertEquals(ColumnOrder.undefined(), columns.get(2).getPrimitiveType().columnOrder());
   }
+
+  @Test
+  public void testOffsetIndexConversion() {
+    OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder();
+    builder.add(1000, 10000, 0);
+    builder.add(22000, 12000, 100);
+    OffsetIndex offsetIndex = ParquetMetadataConverter
+        .fromParquetOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(builder.build(100000)));
+    assertEquals(2, offsetIndex.getPageCount());
+    assertEquals(101000, offsetIndex.getOffset(0));
+    assertEquals(10000, offsetIndex.getCompressedPageSize(0));
+    assertEquals(0, offsetIndex.getFirstRowIndex(0));
+    assertEquals(122000, offsetIndex.getOffset(1));
+    assertEquals(12000, offsetIndex.getCompressedPageSize(1));
+    assertEquals(100, offsetIndex.getFirstRowIndex(1));
+  }
+
+  @Test
+  public void testColumnIndexConversion() {
+    PrimitiveType type = Types.required(PrimitiveTypeName.INT64).named("test_int64");
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    Statistics<?> stats = Statistics.createStats(type);
+    stats.incrementNumNulls(16);
+    stats.updateStats(-100l);
+    stats.updateStats(100l);
+    builder.add(stats);
+    stats = Statistics.createStats(type);
+    stats.incrementNumNulls(111);
+    builder.add(stats);
+    stats = Statistics.createStats(type);
+    stats.updateStats(200l);
+    stats.updateStats(500l);
+    builder.add(stats);
+    org.apache.parquet.format.ColumnIndex parquetColumnIndex = 
+        ParquetMetadataConverter.toParquetColumnIndex(type, builder.build());
+    ColumnIndex columnIndex = ParquetMetadataConverter.fromParquetColumnIndex(type, parquetColumnIndex);
+    assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+    assertTrue(Arrays.asList(false, true, false).equals(columnIndex.getNullPages()));
+    assertTrue(Arrays.asList(16l, 111l, 0l).equals(columnIndex.getNullCounts()));
+    assertTrue(Arrays.asList(
+        ByteBuffer.wrap(BytesUtils.longToBytes(-100l)),
+        ByteBuffer.allocate(0),
+        ByteBuffer.wrap(BytesUtils.longToBytes(200l))).equals(columnIndex.getMinValues()));
+    assertTrue(Arrays.asList(
+        ByteBuffer.wrap(BytesUtils.longToBytes(100l)),
+        ByteBuffer.allocate(0),
+        ByteBuffer.wrap(BytesUtils.longToBytes(500l))).equals(columnIndex.getMaxValues()));
+
+    assertNull("Should handle null column index", ParquetMetadataConverter
+        .toParquetColumnIndex(Types.required(PrimitiveTypeName.INT32).named("test_int32"), null));
+    assertNull("Should ignore unsupported types", ParquetMetadataConverter
+        .toParquetColumnIndex(Types.required(PrimitiveTypeName.INT96).named("test_int96"), columnIndex));
+    assertNull("Should ignore unsupported types",
+        ParquetMetadataConverter.fromParquetColumnIndex(Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+            .length(12).as(OriginalType.INTERVAL).named("test_interval"), parquetColumnIndex));
+  }
 }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index a5381f073..b78726838 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -18,8 +18,13 @@
  */
 package org.apache.parquet.hadoop;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.inOrder;
 import static org.apache.parquet.column.Encoding.PLAIN;
 import static org.apache.parquet.column.Encoding.RLE;
@@ -51,13 +56,23 @@
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.BinaryStatistics;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
@@ -66,6 +81,40 @@
 
 public class TestColumnChunkPageWriteStore {
 
+  // OutputFile implementation to expose the PositionOutputStream internally used by the writer
+  private static class OutputFileForTesting implements OutputFile {
+    private PositionOutputStream out;
+    private final HadoopOutputFile file;
+
+    OutputFileForTesting(Path path, Configuration conf) throws IOException {
+      file = HadoopOutputFile.fromPath(path, conf);
+    }
+
+    PositionOutputStream out() {
+      return out;
+    }
+
+    @Override
+    public PositionOutputStream create(long blockSizeHint) throws IOException {
+      return out = file.create(blockSizeHint);
+    }
+
+    @Override
+    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
+      return out = file.createOrOverwrite(blockSizeHint);
+    }
+
+    @Override
+    public boolean supportsBlockSize() {
+      return file.supportsBlockSize();
+    }
+
+    @Override
+    public long defaultBlockSize() {
+      return file.defaultBlockSize();
+    }
+  }
+
   private int pageSize = 1024;
   private int initialSize = 1024;
   private Configuration conf;
@@ -98,11 +147,18 @@ public void test() throws Exception {
     BytesInput data = BytesInput.fromInt(v);
     int rowCount = 5;
     int nullCount = 1;
+    statistics.incrementNumNulls(nullCount);
+    statistics.setMinMaxFromBytes(new byte[] {0, 1, 2}, new byte[] {0, 1, 2, 3});
+    long pageOffset;
+    long pageSize;
 
     {
-      ParquetFileWriter writer = new ParquetFileWriter(conf, schema, file);
+      OutputFileForTesting outputFile = new OutputFileForTesting(file, conf);
+      ParquetFileWriter writer = new ParquetFileWriter(outputFile, schema, Mode.CREATE,
+          ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.MAX_PADDING_SIZE_DEFAULT);
       writer.start();
       writer.startBlock(rowCount);
+      pageOffset = outputFile.out().getPos();
       {
         ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , new HeapByteBufferAllocator());
         PageWriter pageWriter = store.getPageWriter(col);
@@ -112,6 +168,7 @@ public void test() throws Exception {
             dataEncoding, data,
             statistics);
         store.flushToFileWriter(writer);
+        pageSize = outputFile.out().getPos() - pageOffset;
       }
       writer.endBlock();
       writer.end(new HashMap<String, String>());
@@ -132,6 +189,20 @@ public void test() throws Exception {
       assertEquals(dataEncoding, page.getDataEncoding());
       assertEquals(v, intValue(page.getData()));
       assertEquals(statistics.toString(), page.getStatistics().toString());
+
+      // Checking column/offset indexes for the one page
+      ColumnChunkMetaData column = footer.getBlocks().get(0).getColumns().get(0);
+      ColumnIndex columnIndex = reader.readColumnIndex(column);
+      assertArrayEquals(statistics.getMinBytes(), columnIndex.getMinValues().get(0).array());
+      assertArrayEquals(statistics.getMaxBytes(), columnIndex.getMaxValues().get(0).array());
+      assertEquals(statistics.getNumNulls(), columnIndex.getNullCounts().get(0).longValue());
+      assertFalse(columnIndex.getNullPages().get(0));
+      OffsetIndex offsetIndex = reader.readOffsetIndex(column);
+      assertEquals(1, offsetIndex.getPageCount());
+      assertEquals(pageSize, offsetIndex.getCompressedPageSize(0));
+      assertEquals(0, offsetIndex.getFirstRowIndex(0));
+      assertEquals(pageOffset, offsetIndex.getOffset(0));
+
       reader.close();
     }
   }
@@ -175,8 +246,20 @@ public void testColumnOrderV1() throws IOException {
     store.flushToFileWriter(mockFileWriter);
 
     for (ColumnDescriptor col : schema.getColumns()) {
-      inOrder.verify(mockFileWriter).startColumn(
-          eq(col), eq((long) fakeCount), eq(UNCOMPRESSED));
+      inOrder.verify(mockFileWriter).writeColumnChunk(
+          eq(col),
+          eq((long) fakeCount),
+          eq(UNCOMPRESSED),
+          isNull(DictionaryPage.class),
+          any(),
+          eq(fakeData.size()),
+          eq(fakeData.size()),
+          eq(fakeStats),
+          same(ColumnIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no column index
+          same(OffsetIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no offset index
+          any(),
+          any(),
+          any());
     }
   }
 
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 095b575c8..a8de38c38 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -24,6 +24,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Version;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
@@ -41,7 +42,11 @@
 import org.apache.parquet.column.statistics.LongStatistics;
 import org.apache.parquet.format.Statistics;
 import org.apache.parquet.hadoop.metadata.*;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
@@ -51,6 +56,8 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.*;
 
 import static org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics;
@@ -766,4 +773,135 @@ public void testWriteMetadataFileWithRelativeOutputPath() throws IOException {
     ParquetFileWriter.writeMetadataFile(conf, relativeRoot, footers, JobSummaryLevel.ALL);
   }
 
+  @Test
+  public void testColumnIndexWriteRead() throws Exception {
+    File testFile = temp.newFile();
+    testFile.delete();
+
+    Path path = new Path(testFile.toURI());
+    Configuration configuration = new Configuration();
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
+    w.start();
+    w.startBlock(4);
+    w.startColumn(C1, 7, CODEC);
+    w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(C2, 8, CODEC);
+    w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(C1, 5, CODEC);
+    long c1p1Starts = w.getPos();
+    w.writeDataPage(2, 4, BytesInput.from(BYTES1), statsC1(null, Binary.fromString("aaa")), 1, BIT_PACKED, BIT_PACKED,
+        PLAIN);
+    long c1p2Starts = w.getPos();
+    w.writeDataPage(3, 4, BytesInput.from(BYTES1), statsC1(Binary.fromString("bbb"), Binary.fromString("ccc")), 3,
+        BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    long c1Ends = w.getPos();
+    w.startColumn(C2, 6, CODEC);
+    long c2p1Starts = w.getPos();
+    w.writeDataPage(2, 4, BytesInput.from(BYTES2), statsC2(117l, 100l), 1, BIT_PACKED, BIT_PACKED, PLAIN);
+    long c2p2Starts = w.getPos();
+    w.writeDataPage(3, 4, BytesInput.from(BYTES2), statsC2(null, null, null), 2, BIT_PACKED, BIT_PACKED, PLAIN);
+    long c2p3Starts = w.getPos();
+    w.writeDataPage(1, 4, BytesInput.from(BYTES2), statsC2(0l), 1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    long c2Ends = w.getPos();
+    w.endBlock();
+    w.startBlock(4);
+    w.startColumn(C1, 7, CODEC);
+    w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.startColumn(C2, 8, CODEC);
+    w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+
+    try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(path, configuration),
+        ParquetReadOptions.builder().build())) {
+      ParquetMetadata footer = reader.getFooter();
+      assertEquals(3, footer.getBlocks().size());
+      BlockMetaData blockMeta = footer.getBlocks().get(1);
+      assertEquals(2, blockMeta.getColumns().size());
+
+      ColumnIndex columnIndex = reader.readColumnIndex(blockMeta.getColumns().get(0));
+      assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+      assertTrue(Arrays.asList(1l, 0l).equals(columnIndex.getNullCounts()));
+      assertTrue(Arrays.asList(false, false).equals(columnIndex.getNullPages()));
+      List<ByteBuffer> minValues = columnIndex.getMinValues();
+      assertEquals(2, minValues.size());
+      List<ByteBuffer> maxValues = columnIndex.getMaxValues();
+      assertEquals(2, maxValues.size());
+      assertEquals("aaa", new String(minValues.get(0).array(), StandardCharsets.UTF_8));
+      assertEquals("aaa", new String(maxValues.get(0).array(), StandardCharsets.UTF_8));
+      assertEquals("bbb", new String(minValues.get(1).array(), StandardCharsets.UTF_8));
+      assertEquals("ccc", new String(maxValues.get(1).array(), StandardCharsets.UTF_8));
+
+      columnIndex = reader.readColumnIndex(blockMeta.getColumns().get(1));
+      assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder());
+      assertTrue(Arrays.asList(0l, 3l, 0l).equals(columnIndex.getNullCounts()));
+      assertTrue(Arrays.asList(false, true, false).equals(columnIndex.getNullPages()));
+      minValues = columnIndex.getMinValues();
+      assertEquals(3, minValues.size());
+      maxValues = columnIndex.getMaxValues();
+      assertEquals(3, maxValues.size());
+      assertEquals(100, BytesUtils.bytesToLong(minValues.get(0).array()));
+      assertEquals(117, BytesUtils.bytesToLong(maxValues.get(0).array()));
+      assertEquals(0, minValues.get(1).array().length);
+      assertEquals(0, maxValues.get(1).array().length);
+      assertEquals(0, BytesUtils.bytesToLong(minValues.get(2).array()));
+      assertEquals(0, BytesUtils.bytesToLong(maxValues.get(2).array()));
+
+      OffsetIndex offsetIndex = reader.readOffsetIndex(blockMeta.getColumns().get(0));
+      assertEquals(2, offsetIndex.getPageCount());
+      assertEquals(c1p1Starts, offsetIndex.getOffset(0));
+      assertEquals(c1p2Starts, offsetIndex.getOffset(1));
+      assertEquals(c1p2Starts - c1p1Starts, offsetIndex.getCompressedPageSize(0));
+      assertEquals(c1Ends - c1p2Starts, offsetIndex.getCompressedPageSize(1));
+      assertEquals(0, offsetIndex.getFirstRowIndex(0));
+      assertEquals(1, offsetIndex.getFirstRowIndex(1));
+
+      offsetIndex = reader.readOffsetIndex(blockMeta.getColumns().get(1));
+      assertEquals(3, offsetIndex.getPageCount());
+      assertEquals(c2p1Starts, offsetIndex.getOffset(0));
+      assertEquals(c2p2Starts, offsetIndex.getOffset(1));
+      assertEquals(c2p3Starts, offsetIndex.getOffset(2));
+      assertEquals(c2p2Starts - c2p1Starts, offsetIndex.getCompressedPageSize(0));
+      assertEquals(c2p3Starts - c2p2Starts, offsetIndex.getCompressedPageSize(1));
+      assertEquals(c2Ends - c2p3Starts, offsetIndex.getCompressedPageSize(2));
+      assertEquals(0, offsetIndex.getFirstRowIndex(0));
+      assertEquals(1, offsetIndex.getFirstRowIndex(1));
+      assertEquals(3, offsetIndex.getFirstRowIndex(2));
+    }
+  }
+
+  private org.apache.parquet.column.statistics.Statistics<?> statsC1(Binary... values) {
+    org.apache.parquet.column.statistics.Statistics<?> stats = org.apache.parquet.column.statistics.Statistics
+        .createStats(C1.getPrimitiveType());
+    for (Binary value : values) {
+      if (value == null) {
+        stats.incrementNumNulls();
+      } else {
+        stats.updateStats(value);
+      }
+    }
+    return stats;
+  }
+
+  private org.apache.parquet.column.statistics.Statistics<?> statsC2(Long... values) {
+    org.apache.parquet.column.statistics.Statistics<?> stats = org.apache.parquet.column.statistics.Statistics
+        .createStats(C2.getPrimitiveType());
+    for (Long value : values) {
+      if (value == null) {
+        stats.incrementNumNulls();
+      } else {
+        stats.updateStats(value);
+      }
+    }
+    return stats;
+  }
 }
diff --git a/pom.xml b/pom.xml
index 7b3f36fe5..dbd68bb87 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@
     <hadoop1.version>1.2.1</hadoop1.version>
     <cascading.version>2.7.1</cascading.version>
     <cascading3.version>3.1.2</cascading3.version>
-    <parquet.format.version>2.4.0</parquet.format.version>
+    <parquet.format.version>2.5.0</parquet.format.version>
     <previous.version>1.7.0</previous.version>
     <thrift.executable>thrift</thrift.executable>
     <scala.version>2.10.6</scala.version>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Column indexes: read/write API
> ------------------------------
>
>                 Key: PARQUET-1211
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1211
>             Project: Parquet
>          Issue Type: Sub-task
>            Reporter: Gabor Szadovszky
>            Assignee: Gabor Szadovszky
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)